import dronekit from dronekit import connect from dronekit import VehicleMode from dronekit import LocationGlobal from pallet_proximity_sensor import PalletProximity from math import asin, cos, radians, sin, sqrt import numpy as np import requests import json import time import logging from state_machine import StateMachineHandler def haversine_m(lon1, lat1, lon2, lat2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) r = 6371 # Radius of earth in kilometers. Use 3956 for miles return c * r * 1000 def get_bearing(lat1, long1, lat2, long2): dLon = (long2 - long1) x = cos(radians(lat2)) * sin(radians(dLon)) y = cos(radians(lat1)) * sin(radians(lat2)) - sin(radians(lat1)) * cos(radians(lat2)) * cos(radians(dLon)) brng = np.arctan2(x,y) brng = np.degrees(brng) return brng def remap(in_value, in_min, in_max, out_min, out_max): if in_value > in_max: in_value = in_max if in_value < in_min: in_value = in_min remapped = (((in_value - in_min) * (out_max - out_min)) / (in_max - in_min)) + out_min return remapped class DroneInterface: __ARDUROVER_IP = '127.0.0.1:14550' __TARGET_REACHED_RADIUS = 2.5 #metres def __init__(self) -> None: self.log = logging.getLogger("ardurover_interface") self.log.setLevel("DEBUG") self.vehicle = connect(self.__ARDUROVER_IP, wait_ready=True) time.sleep(2) self.heading_endpoint = "http://localhost:3333/get_heading" self.copmpass_alive_endpoint = "http://localhost:3333/alive" self.STATE_MACHINE = StateMachineHandler() self.estop_engaged = False self.log.info("Connected to ardurover") self.pallet_proximity_sensor = PalletProximity() self.pallet_min_distance = 22 # TODO: calibrate this self.pallet_max_distance = 55 # TODO: calibrate this self.log.info("Connected to pallet proximity sensor") self.print_status() def arm_vehicle(self): # self.vehicle.mode = VehicleMode("GUIDED") self.vehicle.arm() self.log.info("Vehicle Armed") # vehicle.simple_takeoff(aTargetAltitude) def disarm_vehicle(self): self.vehicle.channels.overrides = {} self.vehicle.disarm() self.log.info("Vehicle Disarmed") def print_status(self): self.log.info(f"Autopilot Firmware version: {self.vehicle.version}") self.log.info(f"Autopilot capabilities (supports ftp): {self.vehicle.capabilities.ftp}") self.log.info(f"Global Location: {self.vehicle.location.global_frame}") self.log.info(f"Global Location (relative altitude): {self.vehicle.location.global_relative_frame}") self.log.info(f"Local Location: {self.vehicle.location.local_frame}") self.log.info(f"Attitude: {self.vehicle.attitude}") self.log.info(f"Velocity: {self.vehicle.velocity}") self.log.info(f"GPS: {self.vehicle.gps_0}") self.log.info(f"Groundspeed: {self.vehicle.groundspeed}") self.log.info(f"Airspeed: {self.vehicle.airspeed}") self.log.info(f"Gimbal status: {self.vehicle.gimbal}") self.log.info(f"Battery: {self.vehicle.battery}") self.log.info(f"EKF OK?: {self.vehicle.ekf_ok}") self.log.info(f"Last Heartbeat: {self.vehicle.last_heartbeat}") # self.log.info(f"Heading: {self.vehicle.heading}") self.log.info(f"Is Armable?: {self.vehicle.is_armable}") self.log.info(f"System status: {self.vehicle.system_status.state}") self.log.info(f"Mode: {self.vehicle.mode.name}") self.log.info(f"Armed: {self.vehicle.armed}") def get_heading(self, data = {}): rsp = None rsp = requests.post(self.heading_endpoint, json=data, timeout=1) if rsp: if rsp.ok: json_rsp = json.loads(rsp.text) status = json_rsp.get('Success', None) if status != True: error = json_rsp.get('Error', None) self.log.error(f'Got error: {error}') heading = json_rsp['heading'] return heading else: self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}') else: self.log.critical('Cannot connect to server') def verify_compass(self, data={}): rsp = None try: rsp = requests.post(self.copmpass_alive_endpoint, json=data, timeout=1) if rsp: if rsp.ok: json_rsp = json.loads(rsp.text) status = json_rsp.get('Success', None) if status != True: error = json_rsp.get('Error', None) self.log.error(f'Got error: {error}') return False is_running = json_rsp['is_running'] return is_running else: self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}') return False else: self.log.critical('Cannot connect to server') return False except Exception as e: self.log.error(f"Error in verify_compass: {e}") return False def throttle_limiter(self, throttle, upper=1940, lower=1060): if throttle > upper: throttle = upper if throttle < lower: throttle = lower return throttle def steering_limiter(self, steering, upper=2000, lower=1000): if steering > upper: steering = upper if steering < lower: steering = lower return steering def set_vehicle_steering_and_speed(self, amount_steer, amount_speed): ''' We are currently using rc overrides because ardurover and mavlink have not implemented functionality to change the ground steering... for some fucking reason. channel 4 is the steering channel channel 3 is the drive channel ''' if self.verify_state_for(throttle_PWM=amount_speed): self.vehicle.channels.overrides = {} amount_steer = self.steering_limiter(amount_steer) amount_speed = self.throttle_limiter(amount_speed) # assert 999 < amount_speed < 2001, f"speed PWM out of bounds ({amount_speed} != 1000->2000)" self.log.info(f"Sending PWM {amount_steer} to channel 4") self.log.info(f"Sending PWM {amount_speed} to channel 3") self.vehicle.channels.overrides['4'] = int(amount_steer) self.vehicle.channels.overrides['3'] = int(amount_speed) def set_vehicle_speed(self, amount_speed: int): amount_speed = self.throttle_limiter(amount_speed) # assert 999 < amount_speed < 2001, f"speed PWM out of bounds ({amount_speed} != 1000->2000)" if self.verify_state_for(throttle_PWM=amount_speed): self.vehicle.channels.overrides = {} self.log.info(f"Sending PWM {amount_speed} to channel 3") self.vehicle.channels.overrides['3'] = int(amount_speed) def set_vehicle_steering(self, amount_steer: int): amount_steer = self.steering_limiter(amount_steer) if self.verify_state_for(): self.vehicle.channels.overrides = {} self.log.info(f"Sending PWM {amount_steer} to channel 4") self.vehicle.channels.overrides['4'] = int(amount_steer) def clear_override_channels(self): self.vehicle.channels.overrides['1'] = 1500 self.vehicle.channels.overrides['2'] = 1500 self.vehicle.channels.overrides['3'] = 1500 self.vehicle.channels.overrides['4'] = 1500 self.vehicle.channels.overrides = {} def override_stop(self): self.vehicle.channels.overrides = {} self.vehicle.channels.overrides['1'] = 1500 self.vehicle.channels.overrides['2'] = 1500 self.vehicle.channels.overrides['3'] = 1500 self.vehicle.channels.overrides['4'] = 1500 def override_stop_drivetrain_fwd(self): if self.vehicle.channels['3'] > 1550: self.vehicle.channels.overrides = {} self.vehicle.channels.overrides['3'] = 1500 def override_stop_drivetrain_back(self): if self.vehicle.channels['3'] < 1450: self.vehicle.channels.overrides = {} self.vehicle.channels.overrides['3'] = 1500 def nudge_fork_vert(self, nudge_up: bool, nudge_amount: float): # c1: lr, c2: ud assert nudge_amount < 1.1, "Moving for over 1 second is not safe!" if self.verify_state_for(is_fork=True): self.vehicle.channels.overrides = {} nudge_amt = 1950 if nudge_up: nudge_amt = 1100 self.log.info(f"Nudging fork {'UP' if nudge_up else 'DOWN'} for {nudge_amount} seconds (chan 2)") self.vehicle.channels.overrides['2'] = nudge_amt time.sleep(nudge_amount) self.vehicle.channels.overrides['2'] = 1500 self.vehicle.channels.overrides = {} def nudge_fork_horiz(self, nudge_left: bool, nudge_amount: float): # Dir: True is left, False is right assert nudge_amount < 1.1, "Moving for over 1 second is not safe!" if self.verify_state_for(is_fork=True): self.vehicle.channels.overrides = {} nudge_amt = 1100 if nudge_left: nudge_amt = 2000 self.log.info(f"Nudging fork {'LEFT' if nudge_left else 'RIGHT'} for {nudge_amount} seconds (chan 1)") self.vehicle.channels.overrides['1'] = nudge_amt time.sleep(nudge_amount) self.vehicle.channels.overrides['1'] = 1500 self.vehicle.channels.overrides = {} def nudge_drive(self, nudge_forward: bool, nudge_amount: float, nudge_steering: int = 1500): # Dir: True is left, False is right assert nudge_amount < 10, "Moving for over 10 seconds is not safe!" nudge = 1000 if nudge_forward: nudge = 2000 if self.verify_state_for(throttle_PWM=nudge): self.vehicle.channels.overrides = {} if nudge_forward: self.log.info(f"Nudging fork FORWARD for {nudge_amount} seconds at steering: {nudge_steering}") nudge_increments = int(nudge_amount / 0.5) for _ in range(nudge_increments): self.set_vehicle_steering_and_speed(nudge_steering, 1900) time.sleep(0.5) self.clear_override_channels() else: self.log.info(f"Nudging fork BACKWARD for {nudge_amount} seconds at steering: {nudge_steering}") nudge_increments = int(nudge_amount / 0.5) for _ in range(nudge_increments): self.set_vehicle_steering_and_speed(nudge_steering, 1000) time.sleep(0.5) self.clear_override_channels() self.vehicle.channels.overrides = {} def check_estop_signal(self): estop = self.vehicle.channels['6'] if estop > 1700: self.log.critical(f"Got {estop} on Aux ESTOP, executing Estop") self.emergency_stop() return True def check_rc_signal(self): rc_override = self.vehicle.channels['5'] if rc_override > 1700: self.log.critical(f"Got {rc_override} on Aux RC_OVERRIDE, returning True") return True return False def emergency_stop(self): self.disarm_vehicle() self.estop_engaged = True self.STATE_MACHINE.set_state('ESTOP') # maybe: self.press_brake_pedal() self.log.critical(f"Estop triggered, disarming vehicle") def verify_state_for(self, target_state=None, throttle_PWM=None, is_fork=False): _, state = self.STATE_MACHINE.get_state() if state in ["ESTOP", "RC_CONTROL", "OTHER_ERROR"]: self.log.warning(f"Warning: machine state is {state}, this does not allow motion") return False elif state in ["PROXIMITY_STOP_FRONT", "PROXIMITY_STOP_REAR"]: if is_fork: return True if throttle_PWM: if (throttle_PWM > 1500 and state == "PROXIMITY_STOP_REAR") or (throttle_PWM < 1500 and state == "PROXIMITY_STOP_FRONT"): return True self.log.warning(f"We are in proximity stop mode ({state}) while trying to send PWM {throttle_PWM}, this is not allowed.") return False if target_state: # TODO: implement table to permit state switching from other state pass return True def go_to_gps_bad(self, lat, lon, alt=200): if self.verify_state_for("NAVIGATING_TO_POINT_GPS"): self.vehicle.mode = VehicleMode("GUIDED") a_location = LocationGlobal(lat, lon, alt) print("-----") print(a_location.lat) print(a_location.lon) print(a_location.alt) print("-----") self.log.info(f"Requesting vehicle to go to {lat}, {lon}") self.vehicle.simple_goto(a_location) def get_speed(self): speed = self.vehicle.channels.overrides.get('3', 1500) return speed def get_steering(self): steering = self.vehicle.channels.overrides.get('4', 1500) return steering def get_estop(self): return self.estop_engaged def get_arm(self): return self.vehicle.armed def go_to_gps(self, lat, lon): if self.verify_state_for("NAVIGATING_TO_POINT_GPS"): target_position = (lat, lon) approaching = True while approaching: current_position = (self.vehicle.location.global_frame.lat, self.vehicle.location.global_frame.lon) distance_to_target = haversine_m(target_position[0], target_position[1], current_position[0], current_position[1]) self.log.error(f"Distance: {distance_to_target} metres") if distance_to_target < self.__TARGET_REACHED_RADIUS: self.log.info(f"Waypoint {lat}, {lon} reached") approaching = False self.override_stop() return True bearing_to_target = get_bearing(current_position[0], current_position[1], target_position[0], target_position[1]) if bearing_to_target < 0: bearing_to_target += 360 current_heading = self.get_heading() error = bearing_to_target - current_heading if error > 180: error = -(360 - error) elif error < -180: error = -(360 + error) # TODO: PID proportional = -2.5 # proportional = 1 integral = 1 derivative = 1 error = error * proportional * integral * derivative steering = remap(error, -90, 90, 1000, 2000) self.log.warning(f"Bearing: {bearing_to_target}, Heading: {current_heading}, Error: {error}, Steering: {steering}") self.set_vehicle_steering_and_speed(steering, 1910) time.sleep(0.3) else: self.log.error("Incorrect vehicle State!") def go_to_heading(self, target_heading): if self.verify_state_for("NAVIGATING_TO_POINT_GPS"): approaching = True while approaching: current_heading = self.get_heading() if target_heading - 7 <= current_heading <= target_heading + 7: approaching = False self.log.info(f"Heading {target_heading} reached") self.override_stop() return True error = target_heading - current_heading self.log.error(f"Angle: {error} deg.") if error > 180: error = -(360 - error) elif error < -180: error = -(360 + error) proportional = -2.5 integral = 1 derivative = 1 error = error * proportional * integral * derivative steering = remap(error, -90, 90, 1000, 2000) self.log.warning(f"target hearing: {target_heading}, Heading: {current_heading}, Error: {error}, Steering: {steering}") self.set_vehicle_steering_and_speed(steering, 1910) time.sleep(0.3) def check_gps_in_radius(self, target_gps): actual_lat, actual_lon = self.get_gps() target_lat, target_lon = target_gps if actual_lat == 0 or actual_lon == 0: logging.critical("Can not get GPS fix!") return False, 99999 radius = self.gps_target_radius_km distance = haversine_m(target_lat, target_lon, actual_lat, actual_lon) logging.info(f'Distance to target (m) : {distance*1000}') if distance <= radius: return True, distance*1000 return False, distance*1000 def get_gps(self): lat = self.vehicle.location.global_frame.lat lon = self.vehicle.location.global_frame.lon return (lat, lon) def approach_pallet(self): pallet_approached = False while not pallet_approached: time.sleep(0.1) distance_to_pallet = self.pallet_proximity_sensor.get_prox_reading() if distance_to_pallet < self.pallet_min_distance: self.log.info(f"We are close enough to pallet ({distance_to_pallet} cm)") pallet_approached = True self.override_stop() self.clear_override_channels() else: self.log.info(f"Distance to pallet ({distance_to_pallet} cm), moving forward...") target_speed_pwm = 1930 self.set_vehicle_speed(target_speed_pwm) def reverse_from_pallet(self): pallet_reversed = False while not pallet_reversed: time.sleep(0.1) distance_to_pallet = self.pallet_proximity_sensor.get_prox_reading() if distance_to_pallet > self.pallet_max_distance: self.log.info(f"We are far enough from pallet ({distance_to_pallet} cm)") pallet_reversed = True self.override_stop() self.clear_override_channels() else: self.log.info(f"Distance to pallet ({distance_to_pallet} cm), moving back...") target_speed_pwm = 1070 self.set_vehicle_speed(target_speed_pwm)