import logging import math import platform import time from dataclasses import dataclass import os import psutil import cv2 import apriltag_detector logging.basicConfig(level='INFO') @dataclass class Resolution: width: int height: int def remap(value, maxInput, minInput, maxOutput, minOutput): value = maxInput if value > maxInput else value value = minInput if value < minInput else value inputSpan = maxInput - minInput outputSpan = maxOutput - minOutput scaledThrust = float(value - minInput) / float(inputSpan) return minOutput + (scaledThrust * outputSpan) class VisionSystem: def __init__(self) -> None: self.log = logging.getLogger("Vision_system") self.log.setLevel("DEBUG") self.detector = apriltag_detector.Detector() self.detector.start_streams(1) self.resolution = Resolution(width=1280, height=720) def find_apriltag(self): results, pose = self.detector.get_tags() if results: center = results[0].center corners = results[0].corners top_left = (int(corners[0][0]), int(corners[0][1])) top_right = (int(corners[1][0]), int(corners[1][1])) botom_right = (int(corners[2][0]), int(corners[2][1])) tag_area = self.get_apriltag_size(top_left, top_right, botom_right) pos_x = 3 * (center[0] - (self.resolution.width/2)) / self.resolution.width pos_y = 3 * (center[1] - (self.resolution.height/2)) / self.resolution.height clean_pos_y = float("{0:.3f}".format(pos_y)) clean_pos_x = float("{0:.3f}".format(pos_x)) translation = (clean_pos_x, clean_pos_y) x, y, z = pose yaw = y if yaw < -0.5: yaw += 0.78 # Small hack to fix the pose estimator angle jump rotation = (x, yaw, z) return translation, rotation, tag_area return None, None, None def get_pallet_pose(self): translation, rotation, area = self.find_apriltag() if translation and rotation: return (translation, rotation, area) return None def get_apriltag_size(self, p1, p2, p3): top_left_x = p1[0] top_left_y = p1[1] top_right_x = p2[0] top_right_y = p2[1] bottom_right_x = p3[0] bottom_right_y = p3[1] dist_top_left_right = ((((top_right_x - top_left_x )**2) + ((top_right_y-top_left_y)**2) )**0.5) dist_right_top_bottom = ((((top_right_x - bottom_right_x )**2) + ((top_right_y-bottom_right_y)**2) )**0.5) area = dist_top_left_right * dist_right_top_bottom return int(area) def check_cargo_alignment(self, thresh_x, thresh_theta, position): cargo_pos_x, cargo_pos_t = position if cargo_pos_x > thresh_x[0] and cargo_pos_x < thresh_x[1]: if cargo_pos_t > thresh_theta[0] and cargo_pos_t < thresh_theta[1]: return True return False def find_apriltag_pc(self): frame, results, pose = self.detector.get_tags_pc() if results: center = results[0].center corners = results[0].corners top_left = (int(corners[0][0]), int(corners[0][1])) top_right = (int(corners[1][0]), int(corners[1][1])) botom_right = (int(corners[2][0]), int(corners[2][1])) bottom_left = (int(corners[3][0]), int(corners[3][1])) font = cv2.FONT_HERSHEY_SIMPLEX cv2.putText(frame, str(top_left), (int(corners[0][0]), int(corners[0][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA) cv2.putText(frame, str(top_right), (int(corners[1][0]), int(corners[1][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA) cv2.putText(frame, str(botom_right), (int(corners[2][0]), int(corners[2][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA) cv2.putText(frame, str(bottom_left), (int(corners[3][0]), int(corners[3][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA) tag_area = self.get_apriltag_size(top_left, top_right, botom_right) pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height clean_pos_y = float("{0:.3f}".format(pos_y)) clean_pos_x = float("{0:.3f}".format(pos_x)) translation = (clean_pos_x, clean_pos_y) x, y, z = pose yaw = y if yaw < -0.5: yaw += 0.78 rotation = (x, yaw, z) return frame, translation, rotation, tag_area return frame, None, None, None def get_pallet_pose_pc(self): frame, translation, rotation, area = self.find_apriltag_pc() if frame.any(): cv2.imshow('frame', frame) if translation and rotation and area: # print(f"Translation_x: {translation[0]}, Rotation_y: {rotation[1]}") return (translation, rotation, area) class CargoApproachingSystem: def __init__(self) -> None: self.ky = -3 self.kt = 0.5 def calculate(self, ey, et): delta = self.ky * math.atan(ey) + self.kt * et return delta class VisionCargoApproacher: def __init__(self, forklift_comms) -> None: self.log = logging.getLogger("VISION CARGO APPROACHER") self.log.setLevel("DEBUG") self.camera = VisionSystem() self.forklift = forklift_comms self.steering_law = CargoApproachingSystem() self.is_running = False self.no_detects = 0 # for how many frames did we not detect an apriltag (stops forklift) self.no_detect_limit = 3 # how many no detect frames are allowed to pass before we stop the forklift # self.max_approach_area_thresh = 3000 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop) self.max_approach_area_thresh = 4500 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop) self.centre_distance_thresh = (-0.2, 0.2) # horizontal tolerance for pallet alignment pre fork alignment self.centre_angle_thresh = (-0.2, 0.2) # angular tolerance for pallet alignment pre fork alignment self.horizontal_camera_offset = 0 # In case the camera isn't perfectly straight, we can compansate for that in code self.steering_log_file = "slf.file" def navigate_to_cargo_vision(self): self.log.info("Starting vision system") self.is_running = True self.forklift.set_state('NAVIGATING_TO_POINT_VISION') while self.is_running: time.sleep(0.05) pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker if pallet_pose: # If we found a tracker self.no_detects = 0 position, rotation, area = pallet_pose error_x = position[0] # centre offset distance error_theta = rotation[1] # centre offset angle error_x += self.horizontal_camera_offset # adding offset # print(error_x, error_theta) delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law # delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000) delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values little_log = f"Error y: {delta} | error theta {error_x} | steering_raw {error_theta} | steering_conv: {delta_PWM}, tag_area: {area};;;\n" self.log.info(little_log) with open(self.steering_log_file, 'a+') as myfile: myfile.writelines([little_log]) if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it speed_pwm = remap(area, 40000, 1000, 1500, 2000) self.forklift.set_speed_steering(speed_pwm, delta_PWM) else: self.forklift.set_speed_steering(1500, 1500) # If we close enough to the cargo, stop and move to next stage self.log.info(f"We are close enough to cargo!, {area}") # TODO: check that we properly lighed up with cargo, if that is not the case, drive back for a bit with correct angle and rerun the approacher is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_x, error_theta)) if is_aligned: self.log.info(f"WE ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}") time.sleep(1) success = True self.is_running = False # self.camera.detector.stop_streams() return success else: self.log.info(f"NOT ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}") time.sleep(0.2) return True delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) self.forklift.set_speed_steering(1000, delta_PWM) time.sleep(3) self.forklift.set_speed_steering(1500, 1500) # Move back for n secnods with x steering (depending on what side the error is) else: self.no_detects +=1 if self.no_detects > self.no_detect_limit: # if we do not detect anything 3 times in a row, stop the vehicle self.log.warning("DID NOT DETECT CODE!") self.forklift.set_speed(1500) # def stop(self): # self.is_running = False # self.forklift.set_disarm() # def end(self): # self.stop() # self.forklift.vehicle.close() # TESTING = False # if platform.release() == '4.9.201-tegra': # if TESTING: # a = Tester() # a.run_jetson() # else: # run = Super() # run.start() # elif platform.release() in ['5.11.0-18-generic', '20.5.0', '5.10.17-v7l+']: # a = Tester() # a.run_pc() # else: # logging.critical(f"Platform {platform.release()} not tested!") class Tester: def __init__(self) -> None: self.log = logging.getLogger("TESTER") self.log.setLevel("DEBUG") self.camera = VisionSystem() self.steering_law = CargoApproachingSystem() self.no_detects = 0 self.max_approach_area_thresh = 40000 self.centre_distance_thresh = (-0.04, 0.08) self.centre_angle_thresh = (-0.2, 0.2) def run_pc(self): while True: pallet_pose = self.camera.get_pallet_pose_pc() if pallet_pose: position, rotation, area = pallet_pose error_y = position[0] error_theta = rotation[1] # print(error_y, error_theta) delta = self.steering_law.calculate(error_y, error_theta) # delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000) delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) print(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}") cv2.waitKey(1) def run_jetson(self): self.log.info("Starting vision system") self.is_running = True while self.is_running: time.sleep(0.05) pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker if pallet_pose: # If we found a tracker self.no_detects = 0 position, rotation, area = pallet_pose error_y = position[0] # centre offset distance error_theta = rotation[1] # centre offset angle # print(error_y, error_theta) delta = self.steering_law.calculate(error_y, error_theta) # getting the steering angle from the control law delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values self.log.info(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}") if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it self.log.info(f"Setting speed: 1950, steering: {delta_PWM}") else: self.log.info(f"We are close enough to cargo!, {area}") is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_y, error_theta)) if is_aligned: self.log.info(f'Cargo aligned: {error_y} in {self.centre_distance_thresh} and {error_theta} in {self.centre_angle_thresh}') else: self.log.info(f'Cargo NOT aligned: {error_y} not in {self.centre_distance_thresh} or {error_theta} inot n {self.centre_angle_thresh}') else: self.no_detects +=1 if self.no_detects > 3: # if we do not detect anything 3 times in a row, stop the vehicle self.log.info("No cargo detected, setting speed to 1500") if __name__ == "__main__": print("ERROR: This script is no longer used in standalone mode")