179 lines
6.2 KiB
Python
179 lines
6.2 KiB
Python
import json
|
|
import logging
|
|
import time
|
|
from math import asin, cos, radians, sin, sqrt
|
|
|
|
import requests
|
|
|
|
logging.basicConfig(level='INFO')
|
|
|
|
def haversine(lon1, lat1, lon2, lat2):
|
|
"""
|
|
Calculate the great circle distance between two points
|
|
on the earth (specified in decimal degrees)
|
|
"""
|
|
# convert decimal degrees to radians
|
|
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
|
|
|
|
# haversine formula
|
|
dlon = lon2 - lon1
|
|
dlat = lat2 - lat1
|
|
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
|
|
c = 2 * asin(sqrt(a))
|
|
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
|
|
return c * r
|
|
|
|
class DroneKitComms:
|
|
def __init__(self) -> None:
|
|
self.log = logging.getLogger("dronekit_comms")
|
|
self.log.setLevel("DEBUG")
|
|
self.url = 'http://192.168.5.2:5000'
|
|
self.heartbeat_endpoint = '/alive'
|
|
self.speed_endpoint = '/set_speed'
|
|
self.steering_endpoint = '/set_steering'
|
|
self.speed_steering_endpoint = '/set_speed_steering'
|
|
self.gps_navigation_endpoint = '/go_to_gps'
|
|
self.goto_heading_endpoint = '/go_to_heading'
|
|
self.set_state_endpoint = '/set_state'
|
|
self.get_state_endpoint = '/get_state'
|
|
self.arm_endpoint = '/arm'
|
|
self.disarm_endpoint = '/disarm'
|
|
self.close_endpoint = '/close'
|
|
self.get_gps_endpoint = '/get_gps'
|
|
self.nudge_fork_vert_endpoint = '/nudge_fork_vert'
|
|
self.nudge_fork_horiz_endpoint = '/nudge_fork_horiz'
|
|
self.nudge_drive_endpoint = '/nudge_drive'
|
|
self.approach_pallet_endpoint = '/approach_pallet'
|
|
self.reverse_from_pallet_endpoint = '/reverse_from_pallet'
|
|
self.gps_target_radius_km = 0.0025
|
|
rsp = self.make_request(self.heartbeat_endpoint)
|
|
if rsp.get('Success', False):
|
|
self.log.info('Server alive')
|
|
else:
|
|
self.log.critical('Server dead! Check route table')
|
|
|
|
def make_request(self, endpoint, data = {}):
|
|
rsp = None
|
|
rsp = requests.post(self.url + endpoint, json=data, timeout=240)
|
|
if rsp:
|
|
if rsp.ok:
|
|
json_rsp = json.loads(rsp.text)
|
|
status = json_rsp.get('Success', None)
|
|
if status != True:
|
|
error = json_rsp.get('Error', None)
|
|
self.log.error(f'Got error: {error}')
|
|
return json_rsp
|
|
else:
|
|
self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}')
|
|
else:
|
|
self.log.critical('Cannot connect to server')
|
|
|
|
def set_steering(self, steering_pwm):
|
|
data = {'steering': steering_pwm}
|
|
_ = self.make_request(self.steering_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def set_speed(self, speed_pwm):
|
|
data = {'speed': speed_pwm}
|
|
_ = self.make_request(self.speed_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def set_speed_steering(self, speed_pwm, steering_pwm):
|
|
data = {'steering': steering_pwm, 'speed': speed_pwm}
|
|
_ = self.make_request(self.speed_steering_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def nudge_fork_vert(self, go_up: bool, amount: float):
|
|
data = {'go_up': go_up, 'amount': amount}
|
|
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def nudge_fork_horiz(self, go_left: bool, amount: float):
|
|
data = {'go_left': go_left, 'amount': amount}
|
|
_ = self.make_request(self.nudge_fork_horiz_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def nudge_drive(self, go_forward: bool, amount: float, steering: int = 1500):
|
|
data = {'go_forward': go_forward, 'amount': amount, 'steering': steering}
|
|
_ = self.make_request(self.nudge_drive_endpoint, data)
|
|
# print(rsp)
|
|
|
|
def go_to_gps(self, lat, lon):
|
|
data = {'lat': lat, 'lon': lon}
|
|
_ = self.make_request(self.gps_navigation_endpoint, data)
|
|
|
|
def go_to_heading(self, heading):
|
|
data = {'pickup_heading': heading}
|
|
_ = self.make_request(self.goto_heading_endpoint, data)
|
|
return True
|
|
|
|
def set_arm(self):
|
|
_ = self.make_request(self.arm_endpoint)
|
|
# print(rsp)
|
|
|
|
def set_disarm(self):
|
|
_ = self.make_request(self.disarm_endpoint)
|
|
# print(rsp)
|
|
|
|
def get_state(self):
|
|
rsp = self.make_request(self.get_state_endpoint)
|
|
machine_state = rsp.get("State", "UNKNOWN")
|
|
return machine_state
|
|
|
|
def set_state(self, state):
|
|
data = {'State': state}
|
|
_ = self.make_request(self.set_state_endpoint, data)
|
|
|
|
def get_gps(self):
|
|
rsp = self.make_request(self.get_gps_endpoint)
|
|
lat = rsp.get("lat", 0)
|
|
lon = rsp.get("lon", 0)
|
|
return (lat, lon)
|
|
|
|
def check_gps_in_radius(self, target_gps):
|
|
actual_lat, actual_lon = self.get_gps()
|
|
target_lat, target_lon = target_gps
|
|
if actual_lat == 0 or actual_lon == 0:
|
|
logging.critical("Can not get GPS fix!")
|
|
return False, 99999
|
|
radius = self.gps_target_radius_km
|
|
distance = haversine(target_lat, target_lon, actual_lat, actual_lon)
|
|
logging.info(f'Distance to target (m) : {distance*1000}')
|
|
if distance <= radius:
|
|
return True, distance*1000
|
|
return False, distance*1000
|
|
|
|
def zero_fork_vert_pickup(self):
|
|
self.nudge_fork_vert(False, 1)
|
|
self.nudge_fork_vert(False, 1)
|
|
self.nudge_fork_vert(False, 1)
|
|
time.sleep(2)
|
|
self.nudge_fork_vert(True, 0.5)
|
|
time.sleep(1)
|
|
|
|
def zero_fork_vert_movement(self):
|
|
self.nudge_fork_vert(False, 1)
|
|
self.nudge_fork_vert(False, 1)
|
|
self.nudge_fork_vert(False, 1)
|
|
time.sleep(2)
|
|
self.nudge_fork_vert(True, 1)
|
|
time.sleep(1)
|
|
|
|
def zero_fork_horiz(self):
|
|
self.nudge_fork_horiz(go_left=True, amount=0.9)
|
|
self.nudge_fork_horiz(go_left=True, amount=0.9)
|
|
time.sleep(2)
|
|
self.nudge_fork_horiz(go_left=False, amount=0.5)
|
|
self.nudge_fork_horiz(go_left=False, amount=0.65)
|
|
|
|
def approach_pallet(self):
|
|
_ = self.make_request(self.approach_pallet_endpoint)
|
|
|
|
def reverse_from_pallet(self):
|
|
_ = self.make_request(self.reverse_from_pallet_endpoint)
|
|
|
|
|
|
# def close_vehicle(self):
|
|
# rsp = self.make_request(self.close_endpoint)
|
|
# print(rsp)
|