import pyrealsense2.pyrealsense2 as rs import numpy as np import cv2 from threading import Thread import logging import time logging.basicConfig(level='INFO') class DepthStream: def __init__(self, resolution) -> None: self.resolution = resolution self.depth_stream = self.init_depth_stream() self.start_depth_stream() self.buffer = self.get_new_depth_image() self.updating = False self.new_frame_ready = False def init_depth_stream(self): self.pipeline = rs.pipeline() self.config = rs.config() self.config.enable_stream(rs.stream.depth, self.resolution[0], self.resolution[1], rs.format.z16, 5) # self.config.enable_stream(rs.stream.depth, self.resolution[0], self.resolution[1], rs.format.z16, 30) def start_depth_stream(self): self.pipeline_profile = self.pipeline.start(self.config) device = self.pipeline_profile.get_device() depth_stream = device.query_sensors()[0] depth_stream.set_option(rs.option.laser_power, 360) # Set laser power to max for (hopefully) better results def get_new_depth_image(self): frames = self.pipeline.wait_for_frames() depth_frame = frames.get_depth_frame() depth_image = np.asanyarray(depth_frame.get_data()) return depth_image def update(self): while self.updating: new_frame = self.get_new_depth_image() self.buffer = new_frame self.new_frame_ready = True def start(self): t = Thread(target=self.update, args=()) t.daemon = True self.updating = True t.start() self.start_time = time.time() return self def read(self): while not self.new_frame_ready: pass self.new_frame_ready = False return self.buffer def stop(self): self.updating = False self.pipeline.stop() class HoleDetector: def __init__(self) -> None: self.thresh_value = 90 self.thresh_delta = 5 self.current_contours = [] self.running = False self.alpha = 0.07 self.current_contours = [] self.centres = [(-1, -1), (-1, -1)] self.resolution = (848, 480) # self.resolution = (1280, 720) if self.resolution == (1280, 720): self.hole_area_lb = 1500 self.hole_area_ub = 6500 self.blur_amount = 11 else: self.hole_area_lb = 1000 self.hole_area_ub = 3000 self.blur_amount = 5 self.log = logging.getLogger("HOLE DETECTOR") def start(self): self.running = True self.stream = DepthStream(self.resolution).start() def stop(self): self.running = False self.stream.stop() def get_new_frame(self): raw_frame = self.stream.read() return raw_frame def preprocess_frame(self, raw_frame): gray_colormap = cv2.convertScaleAbs(raw_frame, alpha=self.alpha) _, black_mask = cv2.threshold(gray_colormap, 10, 255, cv2.THRESH_BINARY_INV) fill_blur = cv2.medianBlur(gray_colormap, 51) black_filled = gray_colormap.copy() black_filled[np.where(black_mask == 255)] = fill_blur[np.where(black_mask == 255)] sharper = cv2.equalizeHist(black_filled) self.output = cv2.cvtColor(sharper, cv2.COLOR_GRAY2BGR) blur = cv2.medianBlur(sharper, self.blur_amount) cv2.imshow("orig", gray_colormap) # cv2.imshow("fill blur", fill_blur) # cv2.imshow("black mask", black_mask) cv2.imshow("black filled", black_filled) # cv2.imshow("minmax", minmax) cv2.imshow("sharper", sharper) cv2.imshow("blur", blur) return blur def run_thresh(self, blur_frame, correcting=False): _, thresh = cv2.threshold(blur_frame, self.thresh_value, 255, cv2.THRESH_BINARY_INV) # if we are clipping, have it move the 75 up by 5 and down by 5, and save new threah value that gave 2 correct area contours canny = cv2.Canny(thresh, 75, 200) # cv2.imshow("thresh", thresh) cv2.imshow("canny", canny) if correcting: self.log.info(f"Running correcting thresh with {self.thresh_value}") # cv2.waitKey(0) return canny def find_holes(self, canny_frame, blur_frame): # contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)CHAIN_APPROX_SIMPLE contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) self.current_contours = [] # print("*********AREAS: ") for contour in contours: area = cv2.contourArea(contour) # print(area) if self.hole_area_lb < area < self.hole_area_ub: self.current_contours.append(contour) self.log.warning(f"Valid contour? {area}") self.current_contours = self.current_contours[::2] if len(self.current_contours) != 2: self.log.critical(f"*** FOUND {len(self.current_contours)} CONTORUS INSTEAD OF 2!!! ***") self.log.critical(f"ATTEMPTING TO CORRECT...") self.log.info(f"Current thresh: {self.thresh_value}") self.___correct_for_offset(blur_frame) self.log.error(F'Total contours: {len(contours)}') for x in contours: self.log.error(f"Size: {cv2.contourArea(x)}") self.log.error("") return False else: for ix in range(2): M = cv2.moments(self.current_contours[ix]) cX = int(M["m10"] / M["m00"]) cY = int(M["m01"] / M["m00"]) self.centres[ix] = (cX, cY) cv2.fillPoly(self.output, pts = [self.current_contours[ix]], color=(255, 0, 0)) cv2.line(self.output, (cX-50, cY), (cX+50, cY), color=(0, 0, 255), thickness=3) cv2.line(self.output, (cX, cY-50), (cX, cY+50), color=(0, 0, 255), thickness=3) cv2.imshow("output", self.output) return True def get_hole_positions(self): if not self.running: self.start() new_frame = self.get_new_frame() blur_frame = self.preprocess_frame(new_frame) canny_frame = self.run_thresh(blur_frame) found_holes = self.find_holes(canny_frame, blur_frame) cv2.waitKey(0) if found_holes: return (self.centres[0], self.centres[1]) return None, None def __count_holes(self, canny_frame): contours, _ = cv2.findContours(canny_frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) current_contours = [] # self.log.info(f"Detected correction hole areas:") for contour in contours: area = cv2.contourArea(contour) # self.log.info(area) if self.hole_area_lb < area < self.hole_area_ub: self.log.info(f"{area} is valid ({self.hole_area_lb}, {self.hole_area_ub})") current_contours.append(contour) current_contours = current_contours[::2] if len(current_contours) == 2: return True return False def ___correct_for_offset(self, blur_frame): orig_thresh = self.thresh_value self.log.error(f"Failed to detect with {self.thresh_value}, trying {orig_thresh + 0.5*self.thresh_delta}") self.thresh_value = orig_thresh + 0.5*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 0.5*self.thresh_delta}") self.thresh_value = orig_thresh - 0.5*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 1*self.thresh_delta}") self.thresh_value = orig_thresh + 1*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 1*self.thresh_delta}") self.thresh_value = orig_thresh - 1*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 2*self.thresh_delta}") self.thresh_value = orig_thresh + 2*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 2*self.thresh_delta}") self.thresh_value = orig_thresh - 2*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh + 3*self.thresh_delta}") self.thresh_value = orig_thresh + 3*self.thresh_delta canny_frame = self.run_thresh(blur_frame, correcting=True) success = self.__count_holes(canny_frame) if success: return else: self.log.error(f"Failed to correct with {self.thresh_value}, trying {orig_thresh - 3*self.thresh_delta}") self.thresh_value = orig_thresh - 3*self.thresh_delta canny_frame = self.run_thresh(blur_frame) success = self.__count_holes(canny_frame) if success: return else: self.log.critical("FAILED to correct thresh :(") self.thresh_value = orig_thresh # give up class HoleAligner: def __init__(self, fl_ctrl) -> None: self.log = logging.getLogger("HOLE ALIGNER") self.vehicle = fl_ctrl self.detector = HoleDetector() self.left_hole_x_offset = 0 # TODO: find offsets with camera mounted self.left_hole_y_offset = 0 # TODO: find offsets with camera mounted def run_iteration(self): hole_1, hole_2 = self.detector.get_hole_positions() if hole_1[0] < hole_2[0]: hole_of_interest = hole_1 else: hole_of_interest = hole_2 offset_x = hole_of_interest[0] - self.left_hole_x_offset offset_y = hole_of_interest[1] - self.left_hole_y_offset self.log.info(f"Relative hole offsets: X: {offset_x}, Y: {offset_y}") x_aligned = False y_aligned = False go_left = False if offset_x > 0: go_left = True go_up = False if offset_y < 0: go_up = True if offset_x < 50: x_aligned = True elif abs(offset_x) > 50: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.2) elif abs(offset_x) > 100: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.4) elif abs(offset_x) > 200: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 0.6) elif abs(offset_x) > 500: # TODO: calibrate this self.vehicle.nudge_fork_horiz(go_left, 1.0) else: self.log.critical("THIS SHOULD NOT HAPPEN!!!!! ()") if offset_y < 50: y_aligned = True elif abs(offset_y) > 50: # TODO: calibrate this self.vehicle.nudge_fork_vert(go_up, 0.2) elif abs(offset_y) > 100: # TODO: calibrate this self.vehicle.nudge_fork_vert(go_up, 0.4) elif abs(offset_y) > 200: # TODO: calibrate this self.vehicle.nudge_fork_vert(go_up, 0.6) elif abs(offset_y) > 500: # TODO: calibrate this self.vehicle.nudge_fork_vert(go_up, 1.0) else: self.log.critical("THIS SHOULD NOT HAPPEN!!!!!") if x_aligned and y_aligned: return True return False def perform_alignment(self): self.detector.start() alignment_done = False while not alignment_done: alignment_done = self.run_iteration() self.log.info("Alignment complete") return True if __name__ == "__main__": a = HoleDetector() a.start() while True: print(a.get_hole_positions())