new_thea/Control/Jetson/dronekit_server_client.py
2021-09-21 17:31:56 +01:00

179 lines
6.2 KiB
Python

import json
import logging
import time
from math import asin, cos, radians, sin, sqrt
import requests
logging.basicConfig(level='INFO')
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r
class DroneKitComms:
def __init__(self) -> None:
self.log = logging.getLogger("dronekit_comms")
self.log.setLevel("DEBUG")
self.url = 'http://192.168.5.2:5000'
self.heartbeat_endpoint = '/alive'
self.speed_endpoint = '/set_speed'
self.steering_endpoint = '/set_steering'
self.speed_steering_endpoint = '/set_speed_steering'
self.gps_navigation_endpoint = '/go_to_gps'
self.goto_heading_endpoint = '/go_to_heading'
self.set_state_endpoint = '/set_state'
self.get_state_endpoint = '/get_state'
self.arm_endpoint = '/arm'
self.disarm_endpoint = '/disarm'
self.close_endpoint = '/close'
self.get_gps_endpoint = '/get_gps'
self.nudge_fork_vert_endpoint = '/nudge_fork_vert'
self.nudge_fork_horiz_endpoint = '/nudge_fork_horiz'
self.nudge_drive_endpoint = '/nudge_drive'
self.approach_pallet_endpoint = '/approach_pallet'
self.reverse_from_pallet_endpoint = '/reverse_from_pallet'
self.gps_target_radius_km = 0.0025
rsp = self.make_request(self.heartbeat_endpoint)
if rsp.get('Success', False):
self.log.info('Server alive')
else:
self.log.critical('Server dead! Check route table')
def make_request(self, endpoint, data = {}):
rsp = None
rsp = requests.post(self.url + endpoint, json=data, timeout=240)
if rsp:
if rsp.ok:
json_rsp = json.loads(rsp.text)
status = json_rsp.get('Success', None)
if status != True:
error = json_rsp.get('Error', None)
self.log.error(f'Got error: {error}')
return json_rsp
else:
self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}')
else:
self.log.critical('Cannot connect to server')
def set_steering(self, steering_pwm):
data = {'steering': steering_pwm}
_ = self.make_request(self.steering_endpoint, data)
# print(rsp)
def set_speed(self, speed_pwm):
data = {'speed': speed_pwm}
_ = self.make_request(self.speed_endpoint, data)
# print(rsp)
def set_speed_steering(self, speed_pwm, steering_pwm):
data = {'steering': steering_pwm, 'speed': speed_pwm}
_ = self.make_request(self.speed_steering_endpoint, data)
# print(rsp)
def nudge_fork_vert(self, go_up: bool, amount: float):
data = {'go_up': go_up, 'amount': amount}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
# print(rsp)
def nudge_fork_horiz(self, go_left: bool, amount: float):
data = {'go_left': go_left, 'amount': amount}
_ = self.make_request(self.nudge_fork_horiz_endpoint, data)
# print(rsp)
def nudge_drive(self, go_forward: bool, amount: float, steering: int = 1500):
data = {'go_forward': go_forward, 'amount': amount, 'steering': steering}
_ = self.make_request(self.nudge_drive_endpoint, data)
# print(rsp)
def go_to_gps(self, lat, lon):
data = {'lat': lat, 'lon': lon}
_ = self.make_request(self.gps_navigation_endpoint, data)
def go_to_heading(self, heading):
data = {'pickup_heading': heading}
_ = self.make_request(self.goto_heading_endpoint, data)
return True
def set_arm(self):
_ = self.make_request(self.arm_endpoint)
# print(rsp)
def set_disarm(self):
_ = self.make_request(self.disarm_endpoint)
# print(rsp)
def get_state(self):
rsp = self.make_request(self.get_state_endpoint)
machine_state = rsp.get("State", "UNKNOWN")
return machine_state
def set_state(self, state):
data = {'State': state}
_ = self.make_request(self.set_state_endpoint, data)
def get_gps(self):
rsp = self.make_request(self.get_gps_endpoint)
lat = rsp.get("lat", 0)
lon = rsp.get("lon", 0)
return (lat, lon)
def check_gps_in_radius(self, target_gps):
actual_lat, actual_lon = self.get_gps()
target_lat, target_lon = target_gps
if actual_lat == 0 or actual_lon == 0:
logging.critical("Can not get GPS fix!")
return False, 99999
radius = self.gps_target_radius_km
distance = haversine(target_lat, target_lon, actual_lat, actual_lon)
logging.info(f'Distance to target (m) : {distance*1000}')
if distance <= radius:
return True, distance*1000
return False, distance*1000
def zero_fork_vert_pickup(self):
self.nudge_fork_vert(False, 1)
self.nudge_fork_vert(False, 1)
self.nudge_fork_vert(False, 1)
time.sleep(2)
self.nudge_fork_vert(True, 0.5)
time.sleep(1)
def zero_fork_vert_movement(self):
self.nudge_fork_vert(False, 1)
self.nudge_fork_vert(False, 1)
self.nudge_fork_vert(False, 1)
time.sleep(2)
self.nudge_fork_vert(True, 1)
time.sleep(1)
def zero_fork_horiz(self):
self.nudge_fork_horiz(go_left=True, amount=0.9)
self.nudge_fork_horiz(go_left=True, amount=0.9)
time.sleep(2)
self.nudge_fork_horiz(go_left=False, amount=0.5)
self.nudge_fork_horiz(go_left=False, amount=0.65)
def approach_pallet(self):
_ = self.make_request(self.approach_pallet_endpoint)
def reverse_from_pallet(self):
_ = self.make_request(self.reverse_from_pallet_endpoint)
# def close_vehicle(self):
# rsp = self.make_request(self.close_endpoint)
# print(rsp)