new_thea/Control/Pi/dronekit_class.py
2021-09-21 12:11:46 +01:00

417 lines
18 KiB
Python

import dronekit
from dronekit import connect
from dronekit import VehicleMode
from dronekit import LocationGlobal
from pallet_proximity_sensor import PalletProximity
from math import asin, cos, radians, sin, sqrt
import numpy as np
import requests
import json
import time
import logging
from state_machine import StateMachineHandler
def haversine_m(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r * 1000
def get_bearing(lat1, long1, lat2, long2):
dLon = (long2 - long1)
x = cos(radians(lat2)) * sin(radians(dLon))
y = cos(radians(lat1)) * sin(radians(lat2)) - sin(radians(lat1)) * cos(radians(lat2)) * cos(radians(dLon))
brng = np.arctan2(x,y)
brng = np.degrees(brng)
return brng
def remap(in_value, in_min, in_max, out_min, out_max):
if in_value > in_max:
in_value = in_max
if in_value < in_min:
in_value = in_min
remapped = (((in_value - in_min) * (out_max - out_min)) / (in_max - in_min)) + out_min
return remapped
class DroneInterface:
__ARDUROVER_IP = '127.0.0.1:14550'
__TARGET_REACHED_RADIUS = 2.5 #metres
def __init__(self) -> None:
self.log = logging.getLogger("ardurover_interface")
self.log.setLevel("DEBUG")
self.vehicle = connect(self.__ARDUROVER_IP, wait_ready=True)
time.sleep(2)
self.heading_endpoint = "http://localhost:3333/get_heading"
self.copmpass_alive_endpoint = "http://localhost:3333/alive"
self.STATE_MACHINE = StateMachineHandler()
self.estop_engaged = False
self.log.info("Connected to ardurover")
self.pallet_proximity_sensor = PalletProximity()
self.pallet_min_distance = 20 # TODO: calibrate this
self.log.info("Connected to pallet proximity sensor")
self.print_status()
def arm_vehicle(self):
# self.vehicle.mode = VehicleMode("GUIDED")
self.vehicle.arm()
self.log.info("Vehicle Armed")
# vehicle.simple_takeoff(aTargetAltitude)
def disarm_vehicle(self):
self.vehicle.channels.overrides = {}
self.vehicle.disarm()
self.log.info("Vehicle Disarmed")
def print_status(self):
self.log.info(f"Autopilot Firmware version: {self.vehicle.version}")
self.log.info(f"Autopilot capabilities (supports ftp): {self.vehicle.capabilities.ftp}")
self.log.info(f"Global Location: {self.vehicle.location.global_frame}")
self.log.info(f"Global Location (relative altitude): {self.vehicle.location.global_relative_frame}")
self.log.info(f"Local Location: {self.vehicle.location.local_frame}")
self.log.info(f"Attitude: {self.vehicle.attitude}")
self.log.info(f"Velocity: {self.vehicle.velocity}")
self.log.info(f"GPS: {self.vehicle.gps_0}")
self.log.info(f"Groundspeed: {self.vehicle.groundspeed}")
self.log.info(f"Airspeed: {self.vehicle.airspeed}")
self.log.info(f"Gimbal status: {self.vehicle.gimbal}")
self.log.info(f"Battery: {self.vehicle.battery}")
self.log.info(f"EKF OK?: {self.vehicle.ekf_ok}")
self.log.info(f"Last Heartbeat: {self.vehicle.last_heartbeat}")
# self.log.info(f"Heading: {self.vehicle.heading}")
self.log.info(f"Is Armable?: {self.vehicle.is_armable}")
self.log.info(f"System status: {self.vehicle.system_status.state}")
self.log.info(f"Mode: {self.vehicle.mode.name}")
self.log.info(f"Armed: {self.vehicle.armed}")
def get_heading(self, data = {}):
rsp = None
rsp = requests.post(self.heading_endpoint, json=data, timeout=1)
if rsp:
if rsp.ok:
json_rsp = json.loads(rsp.text)
status = json_rsp.get('Success', None)
if status != True:
error = json_rsp.get('Error', None)
self.log.error(f'Got error: {error}')
heading = json_rsp['heading']
return heading
else:
self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}')
else:
self.log.critical('Cannot connect to server')
def verify_compass(self, data={}):
rsp = None
try:
rsp = requests.post(self.copmpass_alive_endpoint, json=data, timeout=1)
if rsp:
if rsp.ok:
json_rsp = json.loads(rsp.text)
status = json_rsp.get('Success', None)
if status != True:
error = json_rsp.get('Error', None)
self.log.error(f'Got error: {error}')
return False
is_running = json_rsp['is_running']
return is_running
else:
self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}')
return False
else:
self.log.critical('Cannot connect to server')
return False
except Exception as e:
self.log.error(f"Error in verify_compass: {e}")
return False
def throttle_limiter(self, throttle, upper=1940, lower=1060):
if throttle > upper:
throttle = upper
if throttle < lower:
throttle = lower
return throttle
def steering_limiter(self, steering, upper=2000, lower=1000):
if steering > upper:
steering = upper
if steering < lower:
steering = lower
return steering
def set_vehicle_steering_and_speed(self, amount_steer, amount_speed):
'''
We are currently using rc overrides because
ardurover and mavlink have not implemented
functionality to change the ground steering... for some fucking reason.
channel 4 is the steering channel
channel 3 is the drive channel
'''
if self.verify_state_for(throttle_PWM=amount_speed):
self.vehicle.channels.overrides = {}
amount_steer = self.steering_limiter(amount_steer)
amount_speed = self.throttle_limiter(amount_speed)
# assert 999 < amount_speed < 2001, f"speed PWM out of bounds ({amount_speed} != 1000->2000)"
self.log.info(f"Sending PWM {amount_steer} to channel 4")
self.log.info(f"Sending PWM {amount_speed} to channel 3")
self.vehicle.channels.overrides['4'] = int(amount_steer)
self.vehicle.channels.overrides['3'] = int(amount_speed)
def set_vehicle_speed(self, amount_speed: int):
amount_speed = self.throttle_limiter(amount_speed)
# assert 999 < amount_speed < 2001, f"speed PWM out of bounds ({amount_speed} != 1000->2000)"
if self.verify_state_for(throttle_PWM=amount_speed):
self.vehicle.channels.overrides = {}
self.log.info(f"Sending PWM {amount_speed} to channel 3")
self.vehicle.channels.overrides['3'] = int(amount_speed)
def set_vehicle_steering(self, amount_steer: int):
amount_steer = self.steering_limiter(amount_steer)
if self.verify_state_for():
self.vehicle.channels.overrides = {}
self.log.info(f"Sending PWM {amount_steer} to channel 4")
self.vehicle.channels.overrides['4'] = int(amount_steer)
def clear_override_channels(self):
self.vehicle.channels.overrides['1'] = 1500
self.vehicle.channels.overrides['2'] = 1500
self.vehicle.channels.overrides['3'] = 1500
self.vehicle.channels.overrides['4'] = 1500
self.vehicle.channels.overrides = {}
def override_stop(self):
self.vehicle.channels.overrides = {}
self.vehicle.channels.overrides['1'] = 1500
self.vehicle.channels.overrides['2'] = 1500
self.vehicle.channels.overrides['3'] = 1500
self.vehicle.channels.overrides['4'] = 1500
def override_stop_drivetrain_fwd(self):
if self.vehicle.channels['3'] > 1550:
self.vehicle.channels.overrides = {}
self.vehicle.channels.overrides['3'] = 1500
def override_stop_drivetrain_back(self):
if self.vehicle.channels['3'] < 1450:
self.vehicle.channels.overrides = {}
self.vehicle.channels.overrides['3'] = 1500
def nudge_fork_vert(self, nudge_up: bool, nudge_amount: float): # c1: lr, c2: ud
assert nudge_amount < 1.1, "Moving for over 1 second is not safe!"
if self.verify_state_for(is_fork=True):
self.vehicle.channels.overrides = {}
nudge_amt = 1950
if nudge_up:
nudge_amt = 1100
self.log.info(f"Nudging fork {'UP' if nudge_up else 'DOWN'} for {nudge_amount} seconds (chan 2)")
self.vehicle.channels.overrides['2'] = nudge_amt
time.sleep(nudge_amount)
self.vehicle.channels.overrides['2'] = 1500
self.vehicle.channels.overrides = {}
def nudge_fork_horiz(self, nudge_left: bool, nudge_amount: float): # Dir: True is left, False is right
assert nudge_amount < 1.1, "Moving for over 1 second is not safe!"
if self.verify_state_for(is_fork=True):
self.vehicle.channels.overrides = {}
nudge_amt = 1100
if nudge_left:
nudge_amt = 2000
self.log.info(f"Nudging fork {'LEFT' if nudge_left else 'RIGHT'} for {nudge_amount} seconds (chan 1)")
self.vehicle.channels.overrides['1'] = nudge_amt
time.sleep(nudge_amount)
self.vehicle.channels.overrides['1'] = 1500
self.vehicle.channels.overrides = {}
def nudge_drive(self, nudge_forward: bool, nudge_amount: float, nudge_steering: int = 1500): # Dir: True is left, False is right
assert nudge_amount < 10, "Moving for over 10 seconds is not safe!"
nudge = 1000
if nudge_forward:
nudge = 2000
if self.verify_state_for(throttle_PWM=nudge):
self.vehicle.channels.overrides = {}
if nudge_forward:
self.log.info(f"Nudging fork FORWARD for {nudge_amount} seconds at steering: {nudge_steering}")
nudge_increments = int(nudge_amount / 0.5)
for _ in range(nudge_increments):
self.set_vehicle_steering_and_speed(nudge_steering, 1900)
time.sleep(0.5)
self.clear_override_channels()
else:
self.log.info(f"Nudging fork BACKWARD for {nudge_amount} seconds at steering: {nudge_steering}")
nudge_increments = int(nudge_amount / 0.5)
for _ in range(nudge_increments):
self.set_vehicle_steering_and_speed(nudge_steering, 1000)
time.sleep(0.5)
self.clear_override_channels()
self.vehicle.channels.overrides = {}
def check_estop_signal(self):
estop = self.vehicle.channels['6']
if estop > 1700:
self.log.critical(f"Got {estop} on Aux ESTOP, executing Estop")
self.emergency_stop()
return True
def check_rc_signal(self):
rc_override = self.vehicle.channels['5']
if rc_override > 1700:
self.log.critical(f"Got {rc_override} on Aux RC_OVERRIDE, returning True")
return True
return False
def emergency_stop(self):
self.disarm_vehicle()
self.estop_engaged = True
self.STATE_MACHINE.set_state('ESTOP')
# maybe: self.press_brake_pedal()
self.log.critical(f"Estop triggered, disarming vehicle")
def verify_state_for(self, target_state=None, throttle_PWM=None, is_fork=False):
_, state = self.STATE_MACHINE.get_state()
if state in ["ESTOP", "RC_CONTROL", "OTHER_ERROR"]:
self.log.warning(f"Warning: machine state is {state}, this does not allow motion")
return False
elif state in ["PROXIMITY_STOP_FRONT", "PROXIMITY_STOP_REAR"]:
if is_fork:
return True
if throttle_PWM:
if (throttle_PWM > 1500 and state == "PROXIMITY_STOP_REAR") or (throttle_PWM < 1500 and state == "PROXIMITY_STOP_FRONT"):
return True
self.log.warning(f"We are in proximity stop mode ({state}) while trying to send PWM {throttle_PWM}, this is not allowed.")
return False
if target_state:
# TODO: implement table to permit state switching from other state
pass
return True
def go_to_gps_bad(self, lat, lon, alt=200):
if self.verify_state_for("NAVIGATING_TO_POINT_GPS"):
self.vehicle.mode = VehicleMode("GUIDED")
a_location = LocationGlobal(lat, lon, alt)
print("-----")
print(a_location.lat)
print(a_location.lon)
print(a_location.alt)
print("-----")
self.log.info(f"Requesting vehicle to go to {lat}, {lon}")
self.vehicle.simple_goto(a_location)
def get_speed(self):
speed = self.vehicle.channels.overrides.get('3', 1500)
return speed
def get_steering(self):
steering = self.vehicle.channels.overrides.get('4', 1500)
return steering
def get_estop(self):
return self.estop_engaged
def get_arm(self):
return self.vehicle.armed
def go_to_gps(self, lat, lon):
if self.verify_state_for("NAVIGATING_TO_POINT_GPS"):
target_position = (lat, lon)
approaching = True
while approaching:
current_position = (self.vehicle.location.global_frame.lat, self.vehicle.location.global_frame.lon)
distance_to_target = haversine_m(target_position[0], target_position[1], current_position[0], current_position[1])
self.log.error(f"Distance: {distance_to_target} metres")
if distance_to_target < self.__TARGET_REACHED_RADIUS:
self.log.info(f"Waypoint {lat}, {lon} reached")
approaching = False
self.override_stop()
return True
bearing_to_target = get_bearing(current_position[0], current_position[1], target_position[0], target_position[1])
if bearing_to_target < 0:
bearing_to_target += 360
current_heading = self.get_heading()
error = bearing_to_target - current_heading
if error > 180:
error = -(360 - error)
elif error < -180:
error = -(360 + error)
# TODO: PID
proportional = -2.5
# proportional = 1
integral = 1
derivative = 1
error = error * proportional * integral * derivative
steering = remap(error, -90, 90, 1000, 2000)
self.log.warning(f"Bearing: {bearing_to_target}, Heading: {current_heading}, Error: {error}, Steering: {steering}")
self.set_vehicle_steering_and_speed(steering, 1910)
time.sleep(0.3)
else:
self.log.error("Incorrect vehicle State!")
def go_to_heading(self, target_heading):
if self.verify_state_for("NAVIGATING_TO_POINT_GPS"):
approaching = True
while approaching:
if target_heading - 7 <= current_heading <= target_heading + 7:
approaching = False
self.log.info(f"Heading {target_heading} reached")
self.override_stop()
return True
current_heading = self.get_heading()
error = target_heading - current_heading
self.log.error(f"Angle: {error} deg.")
if error > 180:
error = -(360 - error)
elif error < -180:
error = -(360 + error)
proportional = -2.5
integral = 1
derivative = 1
error = error * proportional * integral * derivative
steering = remap(error, -90, 90, 1000, 2000)
self.log.warning(f"target hearing: {target_heading}, Heading: {current_heading}, Error: {error}, Steering: {steering}")
self.set_vehicle_steering_and_speed(steering, 1910)
time.sleep(0.3)
def check_gps_in_radius(self, target_gps):
actual_lat, actual_lon = self.get_gps()
target_lat, target_lon = target_gps
if actual_lat == 0 or actual_lon == 0:
logging.critical("Can not get GPS fix!")
return False, 99999
radius = self.gps_target_radius_km
distance = haversine_m(target_lat, target_lon, actual_lat, actual_lon)
logging.info(f'Distance to target (m) : {distance*1000}')
if distance <= radius:
return True, distance*1000
return False, distance*1000
def get_gps(self):
lat = self.vehicle.location.global_frame.lat
lon = self.vehicle.location.global_frame.lon
return (lat, lon)
def approach_pallet(self):
pallet_approached = False
while not pallet_approached:
time.sleep(0.2)
distance_to_pallet = self.pallet_proximity_sensor.get_prox_reading()
if distance_to_pallet < self.pallet_min_distance:
self.log.info(f"We are close enough to pallet ({distance_to_pallet} m)")
pallet_approached = True
self.clear_override_channels()
else:
self.log.info(f"Distance to pallet ({distance_to_pallet} m), moving forward...")
target_speed_pwm = 1930
self.set_vehicle_speed(target_speed_pwm)