new_thea/Control/Jetson/dronekit_server_client.py
2021-09-21 12:11:46 +01:00

179 lines
6.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import time
from math import asin, cos, radians, sin, sqrt
import requests
logging.basicConfig(level='INFO')
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r
class DroneKitComms:
def __init__(self) -> None:
self.log = logging.getLogger("dronekit_comms")
self.log.setLevel("DEBUG")
self.url = 'http://192.168.5.2:5000'
self.heartbeat_endpoint = '/alive'
self.speed_endpoint = '/set_speed'
self.steering_endpoint = '/set_steering'
self.speed_steering_endpoint = '/set_speed_steering'
self.gps_navigation_endpoint = '/go_to_gps'
self.set_state_endpoint = '/set_state'
self.get_state_endpoint = '/get_state'
self.arm_endpoint = '/arm'
self.disarm_endpoint = '/disarm'
self.close_endpoint = '/close'
self.get_gps_endpoint = '/get_gps'
self.nudge_fork_vert_endpoint = '/nudge_fork_vert'
self.nudge_fork_horiz_endpoint = '/nudge_fork_horiz'
self.nudge_drive_endpoint = '/nudge_drive'
self.approach_pallet_endpoint = '/approach_pallet'
self.gps_target_radius_km = 0.0025
rsp = self.make_request(self.heartbeat_endpoint)
if rsp.get('Success', False):
self.log.info('Server alive')
else:
self.log.critical('Server dead! Check route table')
def make_request(self, endpoint, data = {}):
rsp = None
rsp = requests.post(self.url + endpoint, json=data, timeout=240)
if rsp:
if rsp.ok:
json_rsp = json.loads(rsp.text)
status = json_rsp.get('Success', None)
if status != True:
error = json_rsp.get('Error', None)
self.log.error(f'Got error: {error}')
return json_rsp
else:
self.log.error(f'Failed to process request, code: {rsp.status_code}, text: {rsp.text}')
else:
self.log.critical('Cannot connect to server')
def set_steering(self, steering_pwm):
data = {'steering': steering_pwm}
_ = self.make_request(self.steering_endpoint, data)
# print(rsp)
def set_speed(self, speed_pwm):
data = {'speed': speed_pwm}
_ = self.make_request(self.speed_endpoint, data)
# print(rsp)
def set_speed_steering(self, speed_pwm, steering_pwm):
data = {'steering': steering_pwm, 'speed': speed_pwm}
_ = self.make_request(self.speed_steering_endpoint, data)
# print(rsp)
def nudge_fork_vert(self, go_up: bool, amount: float):
data = {'go_up': go_up, 'amount': amount}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
# print(rsp)
def nudge_fork_horiz(self, go_left: bool, amount: float):
data = {'go_left': go_left, 'amount': amount}
_ = self.make_request(self.nudge_fork_horiz_endpoint, data)
# print(rsp)
def nudge_drive(self, go_forward: bool, amount: float, steering: int = 1500):
data = {'go_forward': go_forward, 'amount': amount, 'steering': steering}
_ = self.make_request(self.nudge_drive_endpoint, data)
# print(rsp)
def go_to_gps(self, lat, lon):
data = {'lat': lat, 'lon': lon}
_ = self.make_request(self.gps_navigation_endpoint, data)
def set_arm(self):
_ = self.make_request(self.arm_endpoint)
# print(rsp)
def set_disarm(self):
_ = self.make_request(self.disarm_endpoint)
# print(rsp)
def get_state(self):
rsp = self.make_request(self.get_state_endpoint)
machine_state = rsp.get("State", "UNKNOWN")
return machine_state
def set_state(self, state):
data = {'State': state}
_ = self.make_request(self.set_state_endpoint, data)
def get_gps(self):
rsp = self.make_request(self.get_gps_endpoint)
lat = rsp.get("lat", 0)
lon = rsp.get("lon", 0)
return (lat, lon)
def check_gps_in_radius(self, target_gps):
actual_lat, actual_lon = self.get_gps()
target_lat, target_lon = target_gps
if actual_lat == 0 or actual_lon == 0:
logging.critical("Can not get GPS fix!")
return False, 99999
radius = self.gps_target_radius_km
distance = haversine(target_lat, target_lon, actual_lat, actual_lon)
logging.info(f'Distance to target (m) : {distance*1000}')
if distance <= radius:
return True, distance*1000
return False, distance*1000
def zero_fork_vert_pickup(self):
data = {'go_up': False, 'amount': 1.0}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
time.sleep(2)
data = {'go_up': True, 'amount': 0.6}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
def zero_fork_vert_movement(self):
data = {'go_up': False, 'amount': 1.0}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
time.sleep(2)
data = {'go_up': True, 'amount': 0.9}
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
_ = self.make_request(self.nudge_fork_vert_endpoint, data)
def zero_fork_horiz(self):
self.nudge_fork_horiz(go_left=True, amount=0.9)
self.nudge_fork_horiz(go_left=True, amount=0.9)
self.nudge_fork_horiz(go_left=True, amount=0.9)
time.sleep(2)
self.nudge_fork_horiz(go_left=False, amount=0.9)
self.nudge_fork_horiz(go_left=False, amount=0.4) # TODO: tune this
def approach_pallet(self):
_ = self.make_request(self.gps_target_radius_km)
# print(rsp)
# def close_vehicle(self):
# rsp = self.make_request(self.close_endpoint)
# print(rsp)