new_thea/Control/Jetson/pallet_hole_alignment.py
2021-09-21 12:11:46 +01:00

328 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pyrealsense2.pyrealsense2 as rs
import numpy as np
import cv2
from threading import Thread
import logging
import time
logging.basicConfig(level='INFO')
class DepthStream:
def __init__(self, resolution) -> None:
self.resolution = resolution
self.depth_stream = self.init_depth_stream()
self.start_depth_stream()
self.buffer = self.get_new_depth_image()
self.updating = False
self.new_frame_ready = False
def init_depth_stream(self):
self.pipeline = rs.pipeline()
self.config = rs.config()
self.config.enable_stream(rs.stream.depth, self.resolution[0], self.resolution[1], rs.format.z16, 5)
# self.config.enable_stream(rs.stream.depth, self.resolution[0], self.resolution[1], rs.format.z16, 30)
def start_depth_stream(self):
self.pipeline_profile = self.pipeline.start(self.config)
device = self.pipeline_profile.get_device()
depth_stream = device.query_sensors()[0]
depth_stream.set_option(rs.option.laser_power, 360) # Set laser power to max for (hopefully) better results
def get_new_depth_image(self):
frames = self.pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
depth_image = np.asanyarray(depth_frame.get_data())
return depth_image
def update(self):
while self.updating:
new_frame = self.get_new_depth_image()
self.buffer = new_frame
self.new_frame_ready = True
def start(self):
t = Thread(target=self.update, args=())
t.daemon = True
self.updating = True
t.start()
self.start_time = time.time()
return self
def read(self):
while not self.new_frame_ready:
pass
self.new_frame_ready = False
return self.buffer
def stop(self):
self.updating = False
self.pipeline.stop()
class HoleDetector:
def __init__(self) -> None:
self.thresh_value = 90
self.thresh_delta = 5
self.current_contours = []
self.running = False
self.alpha = 0.07
self.current_contours = []
self.centres = [(-1, -1), (-1, -1)]
self.resolution = (848, 480)
# self.resolution = (1280, 720)
if self.resolution == (1280, 720):
self.hole_area_lb = 1500
self.hole_area_ub = 6500
self.blur_amount = 11
else:
self.hole_area_lb = 1000
self.hole_area_ub = 3000
self.blur_amount = 5
self.log = logging.getLogger("HOLE DETECTOR")
def start(self):
self.running = True
self.stream = DepthStream(self.resolution).start()
def stop(self):
self.running = False
self.stream.stop()
def get_new_frame(self):
raw_frame = self.stream.read()
return raw_frame
def preprocess_frame(self, raw_frame):
gray_colormap = cv2.convertScaleAbs(raw_frame, alpha=self.alpha)
_, black_mask = cv2.threshold(gray_colormap, 10, 255, cv2.THRESH_BINARY_INV)
fill_blur = cv2.medianBlur(gray_colormap, 51)
black_filled = gray_colormap.copy()
black_filled[np.where(black_mask == 255)] = fill_blur[np.where(black_mask == 255)]
sharper = cv2.equalizeHist(black_filled)
self.output = cv2.cvtColor(sharper, cv2.COLOR_GRAY2BGR)
blur = cv2.medianBlur(sharper, self.blur_amount)
cv2.imshow("orig", gray_colormap)
# cv2.imshow("fill blur", fill_blur)
# cv2.imshow("black mask", black_mask)
cv2.imshow("black filled", black_filled)
# cv2.imshow("minmax", minmax)
cv2.imshow("sharper", sharper)
cv2.imshow("blur", blur)
return blur
def run_thresh(self, blur_frame, correcting=False):
_, thresh = cv2.threshold(blur_frame, self.thresh_value, 255, cv2.THRESH_BINARY_INV) # if we are clipping, have it move the 75 up by 5 and down by 5, and save new threah value that gave 2 correct area contours
canny = cv2.Canny(thresh, 75, 200)
# cv2.imshow("thresh", thresh)
cv2.imshow("canny", canny)
if correcting:
self.log.info(f"Running correcting thresh with {self.thresh_value}")
# cv2.waitKey(0)
return canny
def find_holes(self, canny_frame, blur_frame):
# contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)CHAIN_APPROX_SIMPLE
contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
self.current_contours = []
# print("*********AREAS: ")
for contour in contours:
area = cv2.contourArea(contour)
# print(area)
if self.hole_area_lb < area < self.hole_area_ub:
self.current_contours.append(contour)
self.log.warning(f"Valid contour? {area}")
self.current_contours = self.current_contours[::2]
if len(self.current_contours) != 2:
self.log.critical(f"*** FOUND {len(self.current_contours)} CONTORUS INSTEAD OF 2!!! ***")
self.log.critical(f"ATTEMPTING TO CORRECT...")
self.log.info(f"Current thresh: {self.thresh_value}")
self.___correct_for_offset(blur_frame)
self.log.error(F'Total contours: {len(contours)}')
for x in contours:
self.log.error(f"Size: {cv2.contourArea(x)}")
self.log.error("")
return False
else:
for ix in range(2):
M = cv2.moments(self.current_contours[ix])
cX = int(M["m10"] / M["m00"])
cY = int(M["m01"] / M["m00"])
self.centres[ix] = (cX, cY)
cv2.fillPoly(self.output, pts = [self.current_contours[ix]], color=(255, 0, 0))
cv2.line(self.output, (cX-50, cY), (cX+50, cY), color=(0, 0, 255), thickness=3)
cv2.line(self.output, (cX, cY-50), (cX, cY+50), color=(0, 0, 255), thickness=3)
cv2.imshow("output", self.output)
return True
def get_hole_positions(self):
if not self.running:
self.start()
new_frame = self.get_new_frame()
blur_frame = self.preprocess_frame(new_frame)
canny_frame = self.run_thresh(blur_frame)
found_holes = self.find_holes(canny_frame, blur_frame)
cv2.waitKey(0)
if found_holes:
return (self.centres[0], self.centres[1])
return None, None
def __count_holes(self, canny_frame):
contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
current_contours = []
# self.log.info(f"Detected correction hole areas:")
for contour in contours:
area = cv2.contourArea(contour)
# self.log.info(area)
if self.hole_area_lb < area < self.hole_area_ub:
self.log.info(f"{area} is valid ({self.hole_area_lb}, {self.hole_area_ub})")
current_contours.append(contour)
current_contours = current_contours[::2]
if len(current_contours) == 2:
return True
return False
def ___correct_for_offset(self, blur_frame):
orig_thresh = self.thresh_value
self.log.error(f"Failed to detect with {self.thresh_value}, trying {orig_thresh + 0.5*self.thresh_delta}")
self.thresh_value = orig_thresh + 0.5*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 0.5*self.thresh_delta}")
self.thresh_value = orig_thresh - 0.5*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 1*self.thresh_delta}")
self.thresh_value = orig_thresh + 1*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 1*self.thresh_delta}")
self.thresh_value = orig_thresh - 1*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 2*self.thresh_delta}")
self.thresh_value = orig_thresh + 2*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 2*self.thresh_delta}")
self.thresh_value = orig_thresh - 2*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 3*self.thresh_delta}")
self.thresh_value = orig_thresh + 3*self.thresh_delta
canny_frame = self.run_thresh(blur_frame, correcting=True)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 3*self.thresh_delta}")
self.thresh_value = orig_thresh - 3*self.thresh_delta
canny_frame = self.run_thresh(blur_frame)
success = self.__count_holes(canny_frame)
if success:
return
else:
self.log.critical("FAILED to correct thresh :(")
self.thresh_value = orig_thresh # give up
class HoleAligner:
def __init__(self, fl_ctrl) -> None:
self.log = logging.getLogger("HOLE ALIGNER")
self.vehicle = fl_ctrl
self.detector = HoleDetector()
self.left_hole_x_offset = 0 # TODO: find offsets with camera mounted
self.left_hole_y_offset = 0 # TODO: find offsets with camera mounted
def run_iteration(self):
hole_1, hole_2 = self.detector.get_hole_positions()
if hole_1[0] < hole_2[0]:
hole_of_interest = hole_1
else:
hole_of_interest = hole_2
offset_x = hole_of_interest[0] - self.left_hole_x_offset
offset_y = hole_of_interest[1] - self.left_hole_y_offset
self.log.info(f"Relative hole offsets: X: {offset_x}, Y: {offset_y}")
x_aligned = False
y_aligned = False
go_left = False
if offset_x > 0:
go_left = True
go_up = False
if offset_y < 0:
go_up = True
if offset_x < 50:
x_aligned = True
elif abs(offset_x) > 50: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.2)
elif abs(offset_x) > 100: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.4)
elif abs(offset_x) > 200: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 0.6)
elif abs(offset_x) > 500: # TODO: calibrate this
self.vehicle.nudge_fork_horiz(go_left, 1.0)
else:
self.log.critical("THIS SHOULD NOT HAPPEN!!!!! ()")
if offset_y < 50:
y_aligned = True
elif abs(offset_y) > 50: # TODO: calibrate this
self.vehicle.nudge_fork_vert(go_up, 0.2)
elif abs(offset_y) > 100: # TODO: calibrate this
self.vehicle.nudge_fork_vert(go_up, 0.4)
elif abs(offset_y) > 200: # TODO: calibrate this
self.vehicle.nudge_fork_vert(go_up, 0.6)
elif abs(offset_y) > 500: # TODO: calibrate this
self.vehicle.nudge_fork_vert(go_up, 1.0)
else:
self.log.critical("THIS SHOULD NOT HAPPEN!!!!!")
if x_aligned and y_aligned:
return True
return False
def perform_alignment(self):
self.detector.start()
alignment_done = False
while not alignment_done:
alignment_done = self.run_iteration()
self.log.info("Alignment complete")
return True
if __name__ == "__main__":
a = HoleDetector()
a.start()
while True:
print(a.get_hole_positions())