295 lines
13 KiB
Python
295 lines
13 KiB
Python
import logging
|
||
import math
|
||
import platform
|
||
|
||
import time
|
||
from dataclasses import dataclass
|
||
|
||
import os
|
||
import psutil
|
||
|
||
import cv2
|
||
|
||
import apriltag_detector
|
||
|
||
|
||
logging.basicConfig(level='INFO')
|
||
|
||
@dataclass
|
||
class Resolution:
|
||
width: int
|
||
height: int
|
||
|
||
def remap(value, maxInput, minInput, maxOutput, minOutput):
|
||
value = maxInput if value > maxInput else value
|
||
value = minInput if value < minInput else value
|
||
inputSpan = maxInput - minInput
|
||
outputSpan = maxOutput - minOutput
|
||
scaledThrust = float(value - minInput) / float(inputSpan)
|
||
return minOutput + (scaledThrust * outputSpan)
|
||
|
||
|
||
class VisionSystem:
|
||
def __init__(self) -> None:
|
||
self.log = logging.getLogger("Vision_system")
|
||
self.log.setLevel("DEBUG")
|
||
self.detector = apriltag_detector.Detector()
|
||
self.detector.start_streams(1)
|
||
self.resolution = Resolution(width=1280, height=720)
|
||
|
||
def find_apriltag(self):
|
||
results, pose = self.detector.get_tags()
|
||
if results:
|
||
center = results[0].center
|
||
|
||
corners = results[0].corners
|
||
top_left = (int(corners[0][0]), int(corners[0][1]))
|
||
top_right = (int(corners[1][0]), int(corners[1][1]))
|
||
botom_right = (int(corners[2][0]), int(corners[2][1]))
|
||
tag_area = self.get_apriltag_size(top_left, top_right, botom_right)
|
||
|
||
pos_x = 3 * (center[0] - (self.resolution.width/2)) / self.resolution.width
|
||
pos_y = 3 * (center[1] - (self.resolution.height/2)) / self.resolution.height
|
||
clean_pos_y = float("{0:.3f}".format(pos_y))
|
||
clean_pos_x = float("{0:.3f}".format(pos_x))
|
||
translation = (clean_pos_x, clean_pos_y)
|
||
x, y, z = pose
|
||
yaw = y
|
||
if yaw < -0.5:
|
||
yaw += 0.78 # Small hack to fix the pose estimator angle jump
|
||
rotation = (x, yaw, z)
|
||
return translation, rotation, tag_area
|
||
return None, None, None
|
||
|
||
def get_pallet_pose(self):
|
||
translation, rotation, area = self.find_apriltag()
|
||
if translation and rotation:
|
||
return (translation, rotation, area)
|
||
return None
|
||
|
||
def get_apriltag_size(self, p1, p2, p3):
|
||
top_left_x = p1[0]
|
||
top_left_y = p1[1]
|
||
top_right_x = p2[0]
|
||
top_right_y = p2[1]
|
||
bottom_right_x = p3[0]
|
||
bottom_right_y = p3[1]
|
||
|
||
dist_top_left_right = ((((top_right_x - top_left_x )**2) + ((top_right_y-top_left_y)**2) )**0.5)
|
||
dist_right_top_bottom = ((((top_right_x - bottom_right_x )**2) + ((top_right_y-bottom_right_y)**2) )**0.5)
|
||
area = dist_top_left_right * dist_right_top_bottom
|
||
return int(area)
|
||
|
||
def check_cargo_alignment(self, thresh_x, thresh_theta, position):
|
||
cargo_pos_x, cargo_pos_t = position
|
||
if cargo_pos_x > thresh_x[0] and cargo_pos_x < thresh_x[1]:
|
||
if cargo_pos_t > thresh_theta[0] and cargo_pos_t < thresh_theta[1]:
|
||
return True
|
||
return False
|
||
|
||
def find_apriltag_pc(self):
|
||
frame, results, pose = self.detector.get_tags_pc()
|
||
if results:
|
||
center = results[0].center
|
||
corners = results[0].corners
|
||
top_left = (int(corners[0][0]), int(corners[0][1]))
|
||
top_right = (int(corners[1][0]), int(corners[1][1]))
|
||
botom_right = (int(corners[2][0]), int(corners[2][1]))
|
||
bottom_left = (int(corners[3][0]), int(corners[3][1]))
|
||
|
||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||
cv2.putText(frame, str(top_left), (int(corners[0][0]), int(corners[0][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
|
||
cv2.putText(frame, str(top_right), (int(corners[1][0]), int(corners[1][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
|
||
cv2.putText(frame, str(botom_right), (int(corners[2][0]), int(corners[2][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
|
||
cv2.putText(frame, str(bottom_left), (int(corners[3][0]), int(corners[3][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
|
||
tag_area = self.get_apriltag_size(top_left, top_right, botom_right)
|
||
|
||
pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width
|
||
pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height
|
||
clean_pos_y = float("{0:.3f}".format(pos_y))
|
||
clean_pos_x = float("{0:.3f}".format(pos_x))
|
||
translation = (clean_pos_x, clean_pos_y)
|
||
x, y, z = pose
|
||
yaw = y
|
||
if yaw < -0.5:
|
||
yaw += 0.78
|
||
rotation = (x, yaw, z)
|
||
return frame, translation, rotation, tag_area
|
||
return frame, None, None, None
|
||
|
||
def get_pallet_pose_pc(self):
|
||
frame, translation, rotation, area = self.find_apriltag_pc()
|
||
if frame.any():
|
||
cv2.imshow('frame', frame)
|
||
if translation and rotation and area:
|
||
# print(f"Translation_x: {translation[0]}, Rotation_y: {rotation[1]}")
|
||
return (translation, rotation, area)
|
||
|
||
|
||
class CargoApproachingSystem:
|
||
def __init__(self) -> None:
|
||
self.ky = 2
|
||
self.kt = -1
|
||
|
||
def calculate(self, ey, et):
|
||
delta = self.ky * math.atan(ey) + self.kt * et
|
||
return delta
|
||
|
||
|
||
class VisionCargoApproacher:
|
||
def __init__(self, forklift_comms) -> None:
|
||
self.log = logging.getLogger("VISION CARGO APPROACHER")
|
||
self.log.setLevel("DEBUG")
|
||
self.camera = VisionSystem()
|
||
self.forklift = forklift_comms
|
||
self.steering_law = CargoApproachingSystem()
|
||
self.is_running = False
|
||
self.no_detects = 0 # for how many frames did we not detect an apriltag (stops forklift)
|
||
self.no_detect_limit = 3 # how many no detect frames are allowed to pass before we stop the forklift
|
||
# self.max_approach_area_thresh = 3000 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop)
|
||
self.max_approach_area_thresh = 4500 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop)
|
||
self.centre_distance_thresh = (-0.2, 0.2) # horizontal tolerance for pallet alignment pre fork alignment
|
||
self.centre_angle_thresh = (-0.2, 0.2) # angular tolerance for pallet alignment pre fork alignment
|
||
self.horizontal_camera_offset = 0 # In case the camera isn't perfectly straight, we can compansate for that in code
|
||
self.steering_log_file = "slf.file"
|
||
|
||
def navigate_to_cargo_vision(self):
|
||
self.log.info("Starting vision system")
|
||
self.is_running = True
|
||
self.forklift.set_state('NAVIGATING_TO_POINT_VISION')
|
||
while self.is_running:
|
||
time.sleep(0.05)
|
||
pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker
|
||
if pallet_pose: # If we found a tracker
|
||
self.no_detects = 0
|
||
position, rotation, area = pallet_pose
|
||
error_x = position[0] # centre offset distance
|
||
error_theta = rotation[1] # centre offset angle
|
||
error_x += self.horizontal_camera_offset # adding offset
|
||
# print(error_x, error_theta)
|
||
delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law
|
||
# delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000)
|
||
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values
|
||
little_log = f"Error y: {delta} | error theta {error_x} | steering_raw {error_theta} | steering_conv: {delta_PWM}, tag_area: {area};;;"
|
||
self.log.info(little_log)
|
||
|
||
with open(self.steering_log_file, 'a+') as myfile:
|
||
myfile.writelines([little_log])
|
||
|
||
if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it
|
||
speed_pwm = remap(area, 40000, 1000, 1500, 2000)
|
||
self.forklift.set_speed_steering(speed_pwm, delta_PWM)
|
||
else:
|
||
self.forklift.set_speed_steering(1500, 1500) # If we close enough to the cargo, stop and move to next stage
|
||
self.log.info(f"We are close enough to cargo!, {area}")
|
||
# TODO: check that we properly lighed up with cargo, if that is not the case, drive back for a bit with correct angle and rerun the approacher
|
||
is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_x, error_theta))
|
||
if is_aligned:
|
||
self.log.info(f"WE ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}")
|
||
time.sleep(1)
|
||
success = True
|
||
self.is_running = False
|
||
# self.camera.detector.stop_streams()
|
||
return success
|
||
else:
|
||
self.log.info(f"NOT ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}")
|
||
time.sleep(0.2)
|
||
return True
|
||
delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law
|
||
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100)
|
||
self.forklift.set_speed_steering(1000, delta_PWM)
|
||
time.sleep(3)
|
||
self.forklift.set_speed_steering(1500, 1500)
|
||
# Move back for n secnods with x steering (depending on what side the error is)
|
||
else:
|
||
self.no_detects +=1
|
||
if self.no_detects > self.no_detect_limit: # if we do not detect anything 3 times in a row, stop the vehicle
|
||
self.log.warning("DID NOT DETECT CODE!")
|
||
self.forklift.set_speed(1500)
|
||
|
||
|
||
# def stop(self):
|
||
# self.is_running = False
|
||
# self.forklift.set_disarm()
|
||
|
||
# def end(self):
|
||
# self.stop()
|
||
# self.forklift.vehicle.close()
|
||
|
||
# TESTING = False
|
||
|
||
# if platform.release() == '4.9.201-tegra':
|
||
# if TESTING:
|
||
# a = Tester()
|
||
# a.run_jetson()
|
||
# else:
|
||
# run = Super()
|
||
# run.start()
|
||
|
||
|
||
# elif platform.release() in ['5.11.0-18-generic', '20.5.0', '5.10.17-v7l+']:
|
||
# a = Tester()
|
||
# a.run_pc()
|
||
|
||
# else:
|
||
# logging.critical(f"Platform {platform.release()} not tested!")
|
||
|
||
class Tester:
|
||
def __init__(self) -> None:
|
||
self.log = logging.getLogger("TESTER")
|
||
self.log.setLevel("DEBUG")
|
||
self.camera = VisionSystem()
|
||
self.steering_law = CargoApproachingSystem()
|
||
self.no_detects = 0
|
||
self.max_approach_area_thresh = 40000
|
||
self.centre_distance_thresh = (-0.04, 0.08)
|
||
self.centre_angle_thresh = (-0.2, 0.2)
|
||
|
||
def run_pc(self):
|
||
while True:
|
||
pallet_pose = self.camera.get_pallet_pose_pc()
|
||
if pallet_pose:
|
||
position, rotation, area = pallet_pose
|
||
error_y = position[0]
|
||
error_theta = rotation[1]
|
||
# print(error_y, error_theta)
|
||
delta = self.steering_law.calculate(error_y, error_theta)
|
||
# delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000)
|
||
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100)
|
||
print(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}")
|
||
cv2.waitKey(1)
|
||
|
||
def run_jetson(self):
|
||
self.log.info("Starting vision system")
|
||
self.is_running = True
|
||
while self.is_running:
|
||
time.sleep(0.05)
|
||
pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker
|
||
if pallet_pose: # If we found a tracker
|
||
self.no_detects = 0
|
||
position, rotation, area = pallet_pose
|
||
error_y = position[0] # centre offset distance
|
||
error_theta = rotation[1] # centre offset angle
|
||
# print(error_y, error_theta)
|
||
delta = self.steering_law.calculate(error_y, error_theta) # getting the steering angle from the control law
|
||
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values
|
||
self.log.info(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}")
|
||
if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it
|
||
self.log.info(f"Setting speed: 1950, steering: {delta_PWM}")
|
||
else:
|
||
self.log.info(f"We are close enough to cargo!, {area}")
|
||
is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_y, error_theta))
|
||
if is_aligned:
|
||
self.log.info(f'Cargo aligned: {error_y} in {self.centre_distance_thresh} and {error_theta} in {self.centre_angle_thresh}')
|
||
else:
|
||
self.log.info(f'Cargo NOT aligned: {error_y} not in {self.centre_distance_thresh} or {error_theta} inot n {self.centre_angle_thresh}')
|
||
|
||
else:
|
||
self.no_detects +=1
|
||
if self.no_detects > 3: # if we do not detect anything 3 times in a row, stop the vehicle
|
||
self.log.info("No cargo detected, setting speed to 1500")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("ERROR: This script is no longer used in standalone mode")
|