new_thea/Control/Jetson/pallet_hole_alignment_c.py
2021-09-21 12:11:46 +01:00

224 lines
7.1 KiB
Python

from threading import Thread
import cv2
import time
import apriltag
import logging
import platform
import math
from dataclasses import dataclass
logging.basicConfig(level='DEBUG')
class CUDA:
def __init__(self) -> None:
self.log = logging.getLogger("CUDA_helper")
self.log.setLevel("DEBUG")
self.cuda_allowed = False
self.use_CUDA = True
if self.use_CUDA:
self.log.warning("CUDA has been disabled due to performance issues...")
self.use_CUDA = False
else:
cuda_devices = cv2.cuda.getCudaEnabledDeviceCount()
if cuda_devices == 0:
self.log.warning("No CUDA devices found, will use CPU")
self.use_CUDA = False
else:
self.log.info("Running with CUDA 😎")
def bgr2gray(self, frame):
if self.use_CUDA:
gpu_frame = cv2.cuda_GpuMat()
gpu_frame.upload(frame)
gray_cuda = cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY)
return gray_cuda.download()
else:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
class VideoStream:
def __init__(self, cam_id=0) -> None: # NOTE: id should be 1
self.stream = self.open_cam_usb(1280, 720)
_, img = self.stream.read()
self.buffer = img
self.running = False
def start(self):
t = Thread(target=self.update, args=())
t.daemon = True
self.running = True
t.start()
self.start_time = time.time()
return self
def update(self):
while self.running:
success, frame = self.stream.read()
assert success
self.buffer = frame
self.stream.release()
def read(self):
return self.buffer
def stop(self):
self.running = False
def open_cam_usb(self, width, height):
gst_str = ('v4l2src device=/dev/cam_align ! video/x-raw, width=(int){}, height=(int){} ! videoconvert ! appsink').format(width, height)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
class HoleDetector:
def __init__(self) -> None:
self.log = logging.getLogger("_C_ HOLE DETECTOR")
self.log.setLevel("DEBUG")
self.CUDA = CUDA()
self.options = apriltag.DetectorOptions(families="tag36h11", nthreads=4)
self.detector = apriltag.Detector(self.options)
self.stream = None
def start_stream(self):
self.stream = VideoStream().start()
def stop_streams(self):
self.stream.stop()
def get_tags(self):
frame = self.stream.read()
if frame.any():
gray_frame = self.CUDA.bgr2gray(frame)
# cv2.imshow("GR", gray_frame)
# cv2.waitKey(1)
result = self.detector.detect(gray_frame)
if result:
return result
return None
@dataclass
class Resolution:
width: int
height: int
class HoleAligner:
def __init__(self, vehicle_ctrl) -> None:
self.log = logging.getLogger("_C_ HOLE ALIGNER")
self.log.setLevel("INFO")
self.detector = HoleDetector()
self.detector.start_stream()
self.resolution = Resolution(width=1280, height=720)
self.vehicle = vehicle_ctrl
self.tag_offset_x = 0
self.tag_offset_y = 0.4
self.tag_tresh_x = 0.1
self.tag_tresh_y = 0.1
self.iterations = 0
#(-0.08, 0.08)
self.range = 'X: -0.93 -> 0.93 | Y: -0.87 -> 0.87 || -1 -> 1'
def find_apriltag(self):
results = self.detector.get_tags()
if results:
center = results[0].center
pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width
pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height
clean_pos_y = float("{0:.3f}".format(pos_y))
clean_pos_x = float("{0:.3f}".format(pos_x))
translation = (clean_pos_x, clean_pos_y)
self.log.debug(f" X: {'+' if clean_pos_x > 0 else ''}{clean_pos_x}| Y: {'+' if clean_pos_y >0 else ''}{clean_pos_y}")
return translation
return None
def calculate_error(self, tag_location):
tag_x, tag_y = tag_location
err_x = float("{0:.2f}".format(tag_x + self.tag_offset_x))
err_y = float("{0:.2f}".format(tag_y + self.tag_offset_y))
self.log.info(f"Alignment error: X {err_x} | Y {err_y}")
return (err_x, err_y)
def run_adjustment_iteration(self, errors):
self.iterations += 1
if self.iterations > 4:
self.log.error(f"Too many iterations, exiting.")
return True
err_x, err_y = errors
x_aligned = False
y_aligned = False
go_left = False
if err_x < 0:
go_left = True
go_up = False
if err_y < 0:
go_up = True
if abs(err_x) > 0.3: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 1.0)
elif abs(err_x) > 0.15: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.6)
elif abs(err_x) > 0.1: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.4)
elif abs(err_x) > 0.08: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.2)
elif abs(err_x) <= 0.08:
x_aligned = True
else:
self.log.critical("THIS SHOULD NOT HAPPEN!!!!!")
# if abs(err_y) > 0.3: # TODO: calibrate this
# self.vehicle.nudge_fork_vert(go_up, 1.0)
# elif abs(err_y) > 0.15: # TODO: calibrate this
# self.vehicle.nudge_fork_vert(go_up, 0.6)
# elif abs(err_y) > 0.1: # TODO: calibrate this
# self.vehicle.nudge_fork_vert(go_up, 0.4)
# elif abs(err_y) > 0.08: # TODO: calibrate this
# self.vehicle.nudge_fork_vert(go_up, 0.2)
# elif abs(err_y) <= 0.08:
# y_aligned = True
# else:
# self.log.critical("THIS SHOULD NOT HAPPEN!!!!!")
if x_aligned:
self.log.info("We aligned.")
return True
return False
def align_fork(self):
is_aligned = False
while not is_aligned:
tag_position = self.find_apriltag()
if not tag_position:
self.log.info("Could not detect c̶o̶d̶e̶ holes")
continue
err = self.calculate_error(tag_position)
is_aligned = self.run_adjustment_iteration(err)
if is_aligned:
return True
class DemoFL:
def __init__(self) -> None:
self.a = True
def nudge_fork_vert(self, dir, amt):
print(f"Moving {'UP' if dir else 'DOWN'} for {amt} s.")
def nudge_fork_horiz(self, dir, amt):
print(f"Moving {'LEFT' if dir else 'RIGHT'} for {amt} s.")
if __name__ == "__main__":
demo_fl = DemoFL()
x = HoleAligner(demo_fl)
while True:
tag_pos = x.find_apriltag()
if not tag_pos:
# notify about failed identification
continue
err = x.calculate_error(tag_pos)
print(err)
# aligned = x.run_adjustment_iteration(err)