new_thea/Control/Jetson/vision_cargo_approach.py
2021-09-21 14:35:05 +01:00

295 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import math
import platform
import time
from dataclasses import dataclass
import os
import psutil
import cv2
import apriltag_detector
logging.basicConfig(level='INFO')
@dataclass
class Resolution:
width: int
height: int
def remap(value, maxInput, minInput, maxOutput, minOutput):
value = maxInput if value > maxInput else value
value = minInput if value < minInput else value
inputSpan = maxInput - minInput
outputSpan = maxOutput - minOutput
scaledThrust = float(value - minInput) / float(inputSpan)
return minOutput + (scaledThrust * outputSpan)
class VisionSystem:
def __init__(self) -> None:
self.log = logging.getLogger("Vision_system")
self.log.setLevel("DEBUG")
self.detector = apriltag_detector.Detector()
self.detector.start_streams(1)
self.resolution = Resolution(width=1280, height=720)
def find_apriltag(self):
results, pose = self.detector.get_tags()
if results:
center = results[0].center
corners = results[0].corners
top_left = (int(corners[0][0]), int(corners[0][1]))
top_right = (int(corners[1][0]), int(corners[1][1]))
botom_right = (int(corners[2][0]), int(corners[2][1]))
tag_area = self.get_apriltag_size(top_left, top_right, botom_right)
pos_x = 3 * (center[0] - (self.resolution.width/2)) / self.resolution.width
pos_y = 3 * (center[1] - (self.resolution.height/2)) / self.resolution.height
clean_pos_y = float("{0:.3f}".format(pos_y))
clean_pos_x = float("{0:.3f}".format(pos_x))
translation = (clean_pos_x, clean_pos_y)
x, y, z = pose
yaw = y
if yaw < -0.5:
yaw += 0.78 # Small hack to fix the pose estimator angle jump
rotation = (x, yaw, z)
return translation, rotation, tag_area
return None, None, None
def get_pallet_pose(self):
translation, rotation, area = self.find_apriltag()
if translation and rotation:
return (translation, rotation, area)
return None
def get_apriltag_size(self, p1, p2, p3):
top_left_x = p1[0]
top_left_y = p1[1]
top_right_x = p2[0]
top_right_y = p2[1]
bottom_right_x = p3[0]
bottom_right_y = p3[1]
dist_top_left_right = ((((top_right_x - top_left_x )**2) + ((top_right_y-top_left_y)**2) )**0.5)
dist_right_top_bottom = ((((top_right_x - bottom_right_x )**2) + ((top_right_y-bottom_right_y)**2) )**0.5)
area = dist_top_left_right * dist_right_top_bottom
return int(area)
def check_cargo_alignment(self, thresh_x, thresh_theta, position):
cargo_pos_x, cargo_pos_t = position
if cargo_pos_x > thresh_x[0] and cargo_pos_x < thresh_x[1]:
if cargo_pos_t > thresh_theta[0] and cargo_pos_t < thresh_theta[1]:
return True
return False
def find_apriltag_pc(self):
frame, results, pose = self.detector.get_tags_pc()
if results:
center = results[0].center
corners = results[0].corners
top_left = (int(corners[0][0]), int(corners[0][1]))
top_right = (int(corners[1][0]), int(corners[1][1]))
botom_right = (int(corners[2][0]), int(corners[2][1]))
bottom_left = (int(corners[3][0]), int(corners[3][1]))
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(frame, str(top_left), (int(corners[0][0]), int(corners[0][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str(top_right), (int(corners[1][0]), int(corners[1][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str(botom_right), (int(corners[2][0]), int(corners[2][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
cv2.putText(frame, str(bottom_left), (int(corners[3][0]), int(corners[3][1])), font, 0.7, (255, 0, 255), 2, cv2.LINE_AA)
tag_area = self.get_apriltag_size(top_left, top_right, botom_right)
pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width
pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height
clean_pos_y = float("{0:.3f}".format(pos_y))
clean_pos_x = float("{0:.3f}".format(pos_x))
translation = (clean_pos_x, clean_pos_y)
x, y, z = pose
yaw = y
if yaw < -0.5:
yaw += 0.78
rotation = (x, yaw, z)
return frame, translation, rotation, tag_area
return frame, None, None, None
def get_pallet_pose_pc(self):
frame, translation, rotation, area = self.find_apriltag_pc()
if frame.any():
cv2.imshow('frame', frame)
if translation and rotation and area:
# print(f"Translation_x: {translation[0]}, Rotation_y: {rotation[1]}")
return (translation, rotation, area)
class CargoApproachingSystem:
def __init__(self) -> None:
self.ky = 2
self.kt = -0.5
def calculate(self, ey, et):
delta = self.ky * math.atan(ey) + self.kt * et
return delta
class VisionCargoApproacher:
def __init__(self, forklift_comms) -> None:
self.log = logging.getLogger("VISION CARGO APPROACHER")
self.log.setLevel("DEBUG")
self.camera = VisionSystem()
self.forklift = forklift_comms
self.steering_law = CargoApproachingSystem()
self.is_running = False
self.no_detects = 0 # for how many frames did we not detect an apriltag (stops forklift)
self.no_detect_limit = 3 # how many no detect frames are allowed to pass before we stop the forklift
# self.max_approach_area_thresh = 3000 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop)
self.max_approach_area_thresh = 4500 # How much area should the Apriltag take up (pixels) before we send the stop comand (how close to the cargo do we stop)
self.centre_distance_thresh = (-0.2, 0.2) # horizontal tolerance for pallet alignment pre fork alignment
self.centre_angle_thresh = (-0.2, 0.2) # angular tolerance for pallet alignment pre fork alignment
self.horizontal_camera_offset = 0 # In case the camera isn't perfectly straight, we can compansate for that in code
self.steering_log_file = "slf.file"
def navigate_to_cargo_vision(self):
self.log.info("Starting vision system")
self.is_running = True
self.forklift.set_state('NAVIGATING_TO_POINT_VISION')
while self.is_running:
time.sleep(0.05)
pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker
if pallet_pose: # If we found a tracker
self.no_detects = 0
position, rotation, area = pallet_pose
error_x = position[0] # centre offset distance
error_theta = rotation[1] # centre offset angle
error_x += self.horizontal_camera_offset # adding offset
# print(error_x, error_theta)
delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law
# delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000)
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values
little_log = f"Error y: {delta} | error theta {error_x} | steering_raw {error_theta} | steering_conv: {delta_PWM}, tag_area: {area};;;\n"
self.log.info(little_log)
with open(self.steering_log_file, 'a+') as myfile:
myfile.writelines([little_log])
if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it
speed_pwm = remap(area, 40000, 1000, 1500, 2000)
self.forklift.set_speed_steering(speed_pwm, delta_PWM)
else:
self.forklift.set_speed_steering(1500, 1500) # If we close enough to the cargo, stop and move to next stage
self.log.info(f"We are close enough to cargo!, {area}")
# TODO: check that we properly lighed up with cargo, if that is not the case, drive back for a bit with correct angle and rerun the approacher
is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_x, error_theta))
if is_aligned:
self.log.info(f"WE ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}")
time.sleep(1)
success = True
self.is_running = False
# self.camera.detector.stop_streams()
return success
else:
self.log.info(f"NOT ALIGNED, MOVE BACK! (centre_distance_thresh: {error_x}, centre_angle_thresh: {error_theta}")
time.sleep(0.2)
return True
delta = self.steering_law.calculate(error_x, error_theta) # getting the steering angle from the control law
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100)
self.forklift.set_speed_steering(1000, delta_PWM)
time.sleep(3)
self.forklift.set_speed_steering(1500, 1500)
# Move back for n secnods with x steering (depending on what side the error is)
else:
self.no_detects +=1
if self.no_detects > self.no_detect_limit: # if we do not detect anything 3 times in a row, stop the vehicle
self.log.warning("DID NOT DETECT CODE!")
self.forklift.set_speed(1500)
# def stop(self):
# self.is_running = False
# self.forklift.set_disarm()
# def end(self):
# self.stop()
# self.forklift.vehicle.close()
# TESTING = False
# if platform.release() == '4.9.201-tegra':
# if TESTING:
# a = Tester()
# a.run_jetson()
# else:
# run = Super()
# run.start()
# elif platform.release() in ['5.11.0-18-generic', '20.5.0', '5.10.17-v7l+']:
# a = Tester()
# a.run_pc()
# else:
# logging.critical(f"Platform {platform.release()} not tested!")
class Tester:
def __init__(self) -> None:
self.log = logging.getLogger("TESTER")
self.log.setLevel("DEBUG")
self.camera = VisionSystem()
self.steering_law = CargoApproachingSystem()
self.no_detects = 0
self.max_approach_area_thresh = 40000
self.centre_distance_thresh = (-0.04, 0.08)
self.centre_angle_thresh = (-0.2, 0.2)
def run_pc(self):
while True:
pallet_pose = self.camera.get_pallet_pose_pc()
if pallet_pose:
position, rotation, area = pallet_pose
error_y = position[0]
error_theta = rotation[1]
# print(error_y, error_theta)
delta = self.steering_law.calculate(error_y, error_theta)
# delta_PWM = remap(delta, -0.8, 0.8, 1000, 2000)
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100)
print(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}")
cv2.waitKey(1)
def run_jetson(self):
self.log.info("Starting vision system")
self.is_running = True
while self.is_running:
time.sleep(0.05)
pallet_pose = self.camera.get_pallet_pose() # getting the position, rotation and size of the tracker
if pallet_pose: # If we found a tracker
self.no_detects = 0
position, rotation, area = pallet_pose
error_y = position[0] # centre offset distance
error_theta = rotation[1] # centre offset angle
# print(error_y, error_theta)
delta = self.steering_law.calculate(error_y, error_theta) # getting the steering angle from the control law
delta_PWM = remap(delta, 0.7, -0.7, 1900, 1100) # Converting steering law output into PWM values
self.log.info(f"PWM: {delta_PWM}, ey: {error_y}, et: {error_theta}")
if area < self.max_approach_area_thresh: # If we are far enough from the cargo, move towards it
self.log.info(f"Setting speed: 1950, steering: {delta_PWM}")
else:
self.log.info(f"We are close enough to cargo!, {area}")
is_aligned = self.camera.check_cargo_alignment(self.centre_distance_thresh, self.centre_angle_thresh, (error_y, error_theta))
if is_aligned:
self.log.info(f'Cargo aligned: {error_y} in {self.centre_distance_thresh} and {error_theta} in {self.centre_angle_thresh}')
else:
self.log.info(f'Cargo NOT aligned: {error_y} not in {self.centre_distance_thresh} or {error_theta} inot n {self.centre_angle_thresh}')
else:
self.no_detects +=1
if self.no_detects > 3: # if we do not detect anything 3 times in a row, stop the vehicle
self.log.info("No cargo detected, setting speed to 1500")
if __name__ == "__main__":
print("ERROR: This script is no longer used in standalone mode")