import json import logging import time from math import asin, cos, radians, sin, sqrt import requests logging.basicConfig(level='INFO') def haversine(lon1, lat1, lon2, lat2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) r = 6371 # Radius of earth in kilometers. Use 3956 for miles return c * r class DroneKitComms: def __init__(self) -> None: self.log = logging.getLogger("dronekit_comms") self.log.setLevel("DEBUG") self.url = 'http://192.168.5.2:5000' self.heartbeat_endpoint = '/alive' self.speed_endpoint = '/set_speed' self.steering_endpoint = '/set_steering' self.speed_steering_endpoint = '/set_speed_steering' self.gps_navigation_endpoint = '/go_to_gps' self.set_state_endpoint = '/set_state' self.get_state_endpoint = '/get_state' self.arm_endpoint = '/arm' self.disarm_endpoint = '/disarm' self.close_endpoint = '/close' self.get_gps_endpoint = '/get_gps' self.nudge_fork_vert_endpoint = '/nudge_fork_vert' self.nudge_fork_horiz_endpoint = '/nudge_fork_horiz' self.nudge_drive_endpoint = '/nudge_drive' self.approach_pallet_endpoint = '/approach_pallet' self.gps_target_radius_km = 0.0025 rsp = self.make_request(self.heartbeat_endpoint) if rsp.get('Success', False): self.log.info('Server alive') else: self.log.critical('Server dead! Check route table') def make_request(self, endpoint, data = {}): rsp = None rsp = requests.post(self.url + endpoint, json=data, timeout=240) if rsp: if rsp.ok: json_rsp = json.loads(rsp.text) status = json_rsp.get('Success', None) if status != True: error = json_rsp.get('Error', None) self.log.error(f'Got error: {error}') return json_rsp else: self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}') else: self.log.critical('Cannot connect to server') def set_steering(self, steering_pwm): data = {'steering': steering_pwm} _ = self.make_request(self.steering_endpoint, data) # print(rsp) def set_speed(self, speed_pwm): data = {'speed': speed_pwm} _ = self.make_request(self.speed_endpoint, data) # print(rsp) def set_speed_steering(self, speed_pwm, steering_pwm): data = {'steering': steering_pwm, 'speed': speed_pwm} _ = self.make_request(self.speed_steering_endpoint, data) # print(rsp) def nudge_fork_vert(self, go_up: bool, amount: float): data = {'go_up': go_up, 'amount': amount} _ = self.make_request(self.nudge_fork_vert_endpoint, data) # print(rsp) def nudge_fork_horiz(self, go_left: bool, amount: float): data = {'go_left': go_left, 'amount': amount} _ = self.make_request(self.nudge_fork_horiz_endpoint, data) # print(rsp) def nudge_drive(self, go_forward: bool, amount: float, steering: int = 1500): data = {'go_forward': go_forward, 'amount': amount, 'steering': steering} _ = self.make_request(self.nudge_drive_endpoint, data) # print(rsp) def go_to_gps(self, lat, lon): data = {'lat': lat, 'lon': lon} _ = self.make_request(self.gps_navigation_endpoint, data) def set_arm(self): _ = self.make_request(self.arm_endpoint) # print(rsp) def set_disarm(self): _ = self.make_request(self.disarm_endpoint) # print(rsp) def get_state(self): rsp = self.make_request(self.get_state_endpoint) machine_state = rsp.get("State", "UNKNOWN") return machine_state def set_state(self, state): data = {'State': state} _ = self.make_request(self.set_state_endpoint, data) def get_gps(self): rsp = self.make_request(self.get_gps_endpoint) lat = rsp.get("lat", 0) lon = rsp.get("lon", 0) return (lat, lon) def check_gps_in_radius(self, target_gps): actual_lat, actual_lon = self.get_gps() target_lat, target_lon = target_gps if actual_lat == 0 or actual_lon == 0: logging.critical("Can not get GPS fix!") return False, 99999 radius = self.gps_target_radius_km distance = haversine(target_lat, target_lon, actual_lat, actual_lon) logging.info(f'Distance to target (m) : {distance*1000}') if distance <= radius: return True, distance*1000 return False, distance*1000 def zero_fork_vert_pickup(self): data = {'go_up': False, 'amount': 1.0} _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) time.sleep(2) data = {'go_up': True, 'amount': 0.6} _ = self.make_request(self.nudge_fork_vert_endpoint, data) def zero_fork_vert_movement(self): data = {'go_up': False, 'amount': 1.0} _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) time.sleep(2) data = {'go_up': True, 'amount': 0.9} _ = self.make_request(self.nudge_fork_vert_endpoint, data) _ = self.make_request(self.nudge_fork_vert_endpoint, data) def zero_fork_horiz(self): self.nudge_fork_horiz(go_left=True, amount=0.9) self.nudge_fork_horiz(go_left=True, amount=0.9) self.nudge_fork_horiz(go_left=True, amount=0.9) time.sleep(2) self.nudge_fork_horiz(go_left=False, amount=0.9) self.nudge_fork_horiz(go_left=False, amount=0.4) # TODO: tune this def approach_pallet(self): _ = self.make_request(self.gps_target_radius_km) # print(rsp) # def close_vehicle(self): # rsp = self.make_request(self.close_endpoint) # print(rsp)