from threading import Thread import cv2 import time import apriltag import logging import platform import math from dataclasses import dataclass import os import psutil logging.basicConfig(level='DEBUG') class CUDA: def __init__(self) -> None: self.log = logging.getLogger("CUDA_helper") self.log.setLevel("DEBUG") self.cuda_allowed = False self.use_CUDA = True if self.use_CUDA: self.log.warning("CUDA has been disabled due to performance issues...") self.use_CUDA = False else: cuda_devices = cv2.cuda.getCudaEnabledDeviceCount() if cuda_devices == 0: self.log.warning("No CUDA devices found, will use CPU") self.use_CUDA = False else: self.log.info("Running with CUDA 😎") def bgr2gray(self, frame): if self.use_CUDA: gpu_frame = cv2.cuda_GpuMat() gpu_frame.upload(frame) gray_cuda = cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY) return gray_cuda.download() else: return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) class VideoStream: def __init__(self, cam_id=0) -> None: # NOTE: id should be 1 self.stream = self.open_cam_usb(1280, 720) _, img = self.stream.read() self.buffer = img self.running = False def start(self): t = Thread(target=self.update, args=()) t.daemon = True self.running = True t.start() self.start_time = time.time() return self def update(self): while self.running: success, frame = self.stream.read() if not success: current_system_pid = os.getpid() ThisSystem = psutil.Process(current_system_pid) ThisSystem.terminate() assert success self.buffer = frame self.stream.release() def read(self): return self.buffer def stop(self): self.running = False def open_cam_usb(self, width, height): gst_str = ('v4l2src device=/dev/cam_align ! video/x-raw, width=(int){}, height=(int){} ! videoconvert ! appsink').format(width, height) return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER) class HoleDetector: def __init__(self) -> None: self.log = logging.getLogger("_C_ HOLE DETECTOR") self.log.setLevel("DEBUG") self.CUDA = CUDA() self.options = apriltag.DetectorOptions(families="tag36h11", nthreads=4) self.detector = apriltag.Detector(self.options) self.stream = None def start_stream(self): self.stream = VideoStream().start() def stop_streams(self): self.stream.stop() def get_tags(self): frame = self.stream.read() if frame.any(): gray_frame = self.CUDA.bgr2gray(frame) # cv2.imshow("GR", gray_frame) # cv2.waitKey(1) result = self.detector.detect(gray_frame) if result: return result return None @dataclass class Resolution: width: int height: int class HoleAligner: def __init__(self, vehicle_ctrl) -> None: self.log = logging.getLogger("_C_ HOLE ALIGNER") self.log.setLevel("INFO") self.detector = HoleDetector() self.detector.start_stream() self.resolution = Resolution(width=1280, height=720) self.vehicle = vehicle_ctrl self.tag_offset_x = 0 self.tag_offset_y = 0.4 self.tag_tresh_x = 0.1 self.tag_tresh_y = 0.1 self.iterations = 0 #(-0.08, 0.08) self.range = 'X: -0.93 -> 0.93 | Y: -0.87 -> 0.87 || -1 -> 1' def find_apriltag(self): results = self.detector.get_tags() if results: center = results[0].center pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height clean_pos_y = float("{0:.3f}".format(pos_y)) clean_pos_x = float("{0:.3f}".format(pos_x)) translation = (clean_pos_x, clean_pos_y) self.log.debug(f" X: {'+' if clean_pos_x > 0 else ''}{clean_pos_x}| Y: {'+' if clean_pos_y >0 else ''}{clean_pos_y}") return translation return None def calculate_error(self, tag_location): tag_x, tag_y = tag_location err_x = float("{0:.2f}".format(tag_x + self.tag_offset_x)) err_y = float("{0:.2f}".format(tag_y + self.tag_offset_y)) self.log.info(f"Alignment error: X {err_x} | Y {err_y}") return (err_x, err_y) def run_adjustment_iteration(self, errors): self.iterations += 1 if self.iterations > 4: self.log.error(f"Too many iterations, exiting.") return True err_x, err_y = errors x_aligned = False y_aligned = False go_left = False if err_x < 0: go_left = True go_up = False if err_y < 0: go_up = True if abs(err_x) > 0.3: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 1.0) elif abs(err_x) > 0.15: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.6) elif abs(err_x) > 0.1: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.4) elif abs(err_x) > 0.08: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.2) elif abs(err_x) <= 0.08: x_aligned = True else: self.log.critical("THIS SHOULD NOT HAPPEN!!!!!") # if abs(err_y) > 0.3: # TODO: calibrate this # self.vehicle.nudge_fork_vert(go_up, 1.0) # elif abs(err_y) > 0.15: # TODO: calibrate this # self.vehicle.nudge_fork_vert(go_up, 0.6) # elif abs(err_y) > 0.1: # TODO: calibrate this # self.vehicle.nudge_fork_vert(go_up, 0.4) # elif abs(err_y) > 0.08: # TODO: calibrate this # self.vehicle.nudge_fork_vert(go_up, 0.2) # elif abs(err_y) <= 0.08: # y_aligned = True # else: # self.log.critical("THIS SHOULD NOT HAPPEN!!!!!") if x_aligned: self.log.info("We aligned.") return True return False def align_fork(self): is_aligned = False while not is_aligned: tag_position = self.find_apriltag() if not tag_position: self.log.info("Could not detect c̶o̶d̶e̶ holes") continue err = self.calculate_error(tag_position) is_aligned = self.run_adjustment_iteration(err) if is_aligned: return True class DemoFL: def __init__(self) -> None: self.a = True def nudge_fork_vert(self, dir, amt): print(f"Moving {'UP' if dir else 'DOWN'} for {amt} s.") def nudge_fork_horiz(self, dir, amt): print(f"Moving {'LEFT' if dir else 'RIGHT'} for {amt} s.") if __name__ == "__main__": demo_fl = DemoFL() x = HoleAligner(demo_fl) while True: tag_pos = x.find_apriltag() if not tag_pos: # notify about failed identification continue err = x.calculate_error(tag_pos) print(err) # aligned = x.run_adjustment_iteration(err)