import serial import logging import time class Proximity: def __init__(self, state_machine, vehicle_ctrl) -> None: self.log = logging.getLogger("PROXIMITY ARRAY") self.log.setLevel("DEBUG") # self.serial_string = '/dev/cu.usbmodem14601' self.serial_string = '/dev/proximity' self.alive = False self.proximity_stopped = False self.limit_cm = 30 self.failed_reads = 0 self.failed_reads_thresh = 4 # self.limit_cm = 5 self.state_machine = state_machine self.vehicle_ctrl = vehicle_ctrl self.rear_sensors = [4, 5, 6, 7, 8, 9, 10] # TODO: change with more sensors self.proximity_detects = 0 self.proximity_detect_thresh = 1 try: self.ser = serial.Serial(self.serial_string, 9600, timeout=1) self.alive = True self.log.info('Proximity serial open') except Exception as e: self.log.critical(f"Cannot open serial device, {e}") self.log.critical('Exiting') exit(0) self.proximities = [99, 99, 99, 99, 99, 99, 99, 99, 99, 99, 99] def get_prox_readings(self): try: self.ser.flushInput() time.sleep(0.05) p1 = p2 = p3 = p4 = p5 = p6 = p7 = p8 = p9 = p10 = p11 = 0 for _ in range(1): raw = str(self.ser.readline()) raw2 = raw.split('\\r')[0].split("'")[1] raw3 = raw2.split('|') p1 += int(raw3[0]) p2 += int(raw3[1]) p3 += int(raw3[2]) p4 += int(raw3[3]) p5 += int(raw3[4]) p6 += int(raw3[5]) p7 += int(raw3[6]) p8 += int(raw3[7]) p9 += int(raw3[8]) p10 += int(raw3[9]) p11 += int(raw3[10]) self.proximities = [int(p1/2), int(p2/2), int(p3/2), int(p4/2), int(p5/2), int(p6/2), int(p7/32), int(p8/2), int(p9/2), int(p10/2), int(p11/2)] self.failed_reads = 0 return self.proximities except Exception as e: self.log.critical(f"ERROR: {e}, serial input: {raw}") self.failed_reads += 1 if self.failed_reads > self.failed_reads_thresh: self.process_system_failure() def process_system_failure(self): self.log.critical(f"Got {self.failed_reads} sequential fails, killing process...") self.alive = False def check_proximity(self): sensor, dist = self.perform_thresh() self.log.info(f"Proximities: FLL: {self.proximities[0]} | FLR: {self.proximities[1]} | FRL: {self.proximities[2]} | FRR: {self.proximities[3]} | RRR: {self.proximities[4]} | RRC: {self.proximities[5]} | RR: {self.proximities[6]} | RC: {self.proximities[7]} | RL: {self.proximities[8]} | RLC: {self.proximities[9]} | RLL: {self.proximities[10]}") self.log.info(f"Proximities: {self.proximities}") if sensor != None: self.proximity_detects +=1 if self.proximity_detects > self.proximity_detect_thresh: front_detect = True if sensor in self.rear_sensors: front_detect = False self.log.warning(f"Detected distance {dist} on sensor {sensor+1} ({'front' if front_detect else 'rear'}), it is below {self.limit_cm}") self.proximity_stopped = True _, prev_state = self.state_machine.get_state() if prev_state == "NAVIGATING_TO_POINT_VISION" and front_detect: self.log.warning("Because we are in vision navigation, we are not stopping as this is likely caused by the pallet") else: self.log.warning(f"Stopping!") if front_detect: self.state_machine.set_state("PROXIMITY_STOP_FRONT") else: self.state_machine.set_state("PROXIMITY_STOP_REAR") # self.vehicle_ctrl.clear_override_channels() while self.proximity_stopped: if front_detect: self.vehicle_ctrl.override_stop_drivetrain_fwd() else: self.vehicle_ctrl.override_stop_drivetrain_back() time.sleep(0.3) obstacle_is_clear = self.check_for_no_obstacle() if obstacle_is_clear: self.log.info("Obstacle is clear") self.state_machine.set_state(prev_state) self.proximity_stopped = False self.vehicle_ctrl.clear_override_channels() else: self.proximity_detects = 0 def check_for_no_obstacle(self): sensor, _ = self.perform_thresh() if sensor == None: return True return False def perform_thresh(self): self.get_prox_readings() for ix, dist in enumerate(self.proximities): if dist < self.limit_cm: return ix, dist return None, None if __name__ == "__main__": test = Proximity() while True: test.check_proximity()