231 lines
7.3 KiB
Python
231 lines
7.3 KiB
Python
from threading import Thread
|
|
import cv2
|
|
import time
|
|
import apriltag
|
|
import logging
|
|
import platform
|
|
import math
|
|
from dataclasses import dataclass
|
|
|
|
import os
|
|
import psutil
|
|
|
|
logging.basicConfig(level='DEBUG')
|
|
|
|
class CUDA:
|
|
def __init__(self) -> None:
|
|
self.log = logging.getLogger("CUDA_helper")
|
|
self.log.setLevel("DEBUG")
|
|
self.cuda_allowed = False
|
|
self.use_CUDA = True
|
|
if self.use_CUDA:
|
|
self.log.warning("CUDA has been disabled due to performance issues...")
|
|
self.use_CUDA = False
|
|
else:
|
|
cuda_devices = cv2.cuda.getCudaEnabledDeviceCount()
|
|
if cuda_devices == 0:
|
|
self.log.warning("No CUDA devices found, will use CPU")
|
|
self.use_CUDA = False
|
|
else:
|
|
self.log.info("Running with CUDA 😎")
|
|
|
|
def bgr2gray(self, frame):
|
|
if self.use_CUDA:
|
|
gpu_frame = cv2.cuda_GpuMat()
|
|
gpu_frame.upload(frame)
|
|
gray_cuda = cv2.cuda.cvtColor(gpu_frame, cv2.COLOR_BGR2GRAY)
|
|
return gray_cuda.download()
|
|
else:
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
|
class VideoStream:
|
|
def __init__(self, cam_id=0) -> None: # NOTE: id should be 1
|
|
self.stream = self.open_cam_usb(1280, 720)
|
|
_, img = self.stream.read()
|
|
self.buffer = img
|
|
self.running = False
|
|
|
|
def start(self):
|
|
t = Thread(target=self.update, args=())
|
|
t.daemon = True
|
|
self.running = True
|
|
t.start()
|
|
self.start_time = time.time()
|
|
return self
|
|
|
|
def update(self):
|
|
while self.running:
|
|
success, frame = self.stream.read()
|
|
if not success:
|
|
current_system_pid = os.getpid()
|
|
ThisSystem = psutil.Process(current_system_pid)
|
|
ThisSystem.terminate()
|
|
assert success
|
|
self.buffer = frame
|
|
self.stream.release()
|
|
|
|
def read(self):
|
|
return self.buffer
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
def open_cam_usb(self, width, height):
|
|
gst_str = ('v4l2src device=/dev/cam_align ! video/x-raw, width=(int){}, height=(int){} ! videoconvert ! appsink').format(width, height)
|
|
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
|
|
|
|
|
|
class HoleDetector:
|
|
def __init__(self) -> None:
|
|
self.log = logging.getLogger("_C_ HOLE DETECTOR")
|
|
self.log.setLevel("DEBUG")
|
|
self.CUDA = CUDA()
|
|
self.options = apriltag.DetectorOptions(families="tag36h11", nthreads=4)
|
|
self.detector = apriltag.Detector(self.options)
|
|
self.stream = None
|
|
|
|
def start_stream(self):
|
|
self.stream = VideoStream().start()
|
|
|
|
def stop_streams(self):
|
|
self.stream.stop()
|
|
|
|
def get_tags(self):
|
|
frame = self.stream.read()
|
|
if frame.any():
|
|
gray_frame = self.CUDA.bgr2gray(frame)
|
|
# cv2.imshow("GR", gray_frame)
|
|
# cv2.waitKey(1)
|
|
result = self.detector.detect(gray_frame)
|
|
if result:
|
|
return result
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class Resolution:
|
|
width: int
|
|
height: int
|
|
|
|
|
|
class HoleAligner:
|
|
def __init__(self, vehicle_ctrl) -> None:
|
|
self.log = logging.getLogger("_C_ HOLE ALIGNER")
|
|
self.log.setLevel("INFO")
|
|
self.detector = HoleDetector()
|
|
self.detector.start_stream()
|
|
self.resolution = Resolution(width=1280, height=720)
|
|
self.vehicle = vehicle_ctrl
|
|
self.tag_offset_x = 0
|
|
self.tag_offset_y = 0.4
|
|
self.tag_tresh_x = 0.1
|
|
self.tag_tresh_y = 0.1
|
|
self.iterations = 0
|
|
#(-0.08, 0.08)
|
|
self.range = 'X: -0.93 -> 0.93 | Y: -0.87 -> 0.87 || -1 -> 1'
|
|
|
|
def find_apriltag(self):
|
|
results = self.detector.get_tags()
|
|
if results:
|
|
center = results[0].center
|
|
pos_x = 2 * (center[0] - (self.resolution.width/2)) / self.resolution.width
|
|
pos_y = 2 * (center[1] - (self.resolution.height/2)) / self.resolution.height
|
|
clean_pos_y = float("{0:.3f}".format(pos_y))
|
|
clean_pos_x = float("{0:.3f}".format(pos_x))
|
|
|
|
translation = (clean_pos_x, clean_pos_y)
|
|
self.log.debug(f" X: {'+' if clean_pos_x > 0 else ''}{clean_pos_x}| Y: {'+' if clean_pos_y >0 else ''}{clean_pos_y}")
|
|
return translation
|
|
return None
|
|
|
|
def calculate_error(self, tag_location):
|
|
tag_x, tag_y = tag_location
|
|
err_x = float("{0:.2f}".format(tag_x + self.tag_offset_x))
|
|
err_y = float("{0:.2f}".format(tag_y + self.tag_offset_y))
|
|
self.log.info(f"Alignment error: X {err_x} | Y {err_y}")
|
|
return (err_x, err_y)
|
|
|
|
def run_adjustment_iteration(self, errors):
|
|
self.iterations += 1
|
|
if self.iterations > 4:
|
|
self.log.error(f"Too many iterations, exiting.")
|
|
return True
|
|
err_x, err_y = errors
|
|
|
|
x_aligned = False
|
|
y_aligned = False
|
|
|
|
go_left = False
|
|
if err_x < 0:
|
|
go_left = True
|
|
|
|
go_up = False
|
|
if err_y < 0:
|
|
go_up = True
|
|
|
|
if abs(err_x) > 0.3: # TODO: calibrate this
|
|
self.vehicle.nudge_fork_horiz(go_left, 1.0)
|
|
elif abs(err_x) > 0.15: # TODO: calibrate this
|
|
self.vehicle.nudge_fork_horiz(go_left, 0.6)
|
|
elif abs(err_x) > 0.1: # TODO: calibrate this
|
|
self.vehicle.nudge_fork_horiz(go_left, 0.4)
|
|
elif abs(err_x) > 0.08: # TODO: calibrate this
|
|
self.vehicle.nudge_fork_horiz(go_left, 0.2)
|
|
elif abs(err_x) <= 0.08:
|
|
x_aligned = True
|
|
else:
|
|
self.log.critical("THIS SHOULD NOT HAPPEN!!!!!")
|
|
|
|
# if abs(err_y) > 0.3: # TODO: calibrate this
|
|
# self.vehicle.nudge_fork_vert(go_up, 1.0)
|
|
# elif abs(err_y) > 0.15: # TODO: calibrate this
|
|
# self.vehicle.nudge_fork_vert(go_up, 0.6)
|
|
# elif abs(err_y) > 0.1: # TODO: calibrate this
|
|
# self.vehicle.nudge_fork_vert(go_up, 0.4)
|
|
# elif abs(err_y) > 0.08: # TODO: calibrate this
|
|
# self.vehicle.nudge_fork_vert(go_up, 0.2)
|
|
# elif abs(err_y) <= 0.08:
|
|
# y_aligned = True
|
|
# else:
|
|
# self.log.critical("THIS SHOULD NOT HAPPEN!!!!!")
|
|
|
|
if x_aligned:
|
|
self.log.info("We aligned.")
|
|
return True
|
|
return False
|
|
|
|
def align_fork(self):
|
|
is_aligned = False
|
|
while not is_aligned:
|
|
tag_position = self.find_apriltag()
|
|
if not tag_position:
|
|
self.log.info("Could not detect c̶o̶d̶e̶ holes")
|
|
continue
|
|
err = self.calculate_error(tag_position)
|
|
is_aligned = self.run_adjustment_iteration(err)
|
|
if is_aligned:
|
|
return True
|
|
|
|
|
|
|
|
class DemoFL:
|
|
def __init__(self) -> None:
|
|
self.a = True
|
|
def nudge_fork_vert(self, dir, amt):
|
|
print(f"Moving {'UP' if dir else 'DOWN'} for {amt} s.")
|
|
def nudge_fork_horiz(self, dir, amt):
|
|
print(f"Moving {'LEFT' if dir else 'RIGHT'} for {amt} s.")
|
|
|
|
if __name__ == "__main__":
|
|
demo_fl = DemoFL()
|
|
x = HoleAligner(demo_fl)
|
|
while True:
|
|
tag_pos = x.find_apriltag()
|
|
if not tag_pos:
|
|
# notify about failed identification
|
|
continue
|
|
err = x.calculate_error(tag_pos)
|
|
print(err)
|
|
# aligned = x.run_adjustment_iteration(err)
|