#!/usr/bin/python3
import logging
import threading
import time
import os
import psutil
import flask
from flask import jsonify, request
logger = logging.getLogger("dronekit_server")
logger.setLevel("DEBUG")
from dronekit_class import DroneInterface
from proximity_detector import Proximity
from state_machine import StateMachineHandler
# __MACHINE_STATES = {
# "IDLE": 0,
# "NAVIGATING_TO_POINT_GPS": 1,
# "NAVIGATING_TO_POINT_VISION": 2,
# "MANIPULATING_CARGO": 3,
# "READY_FOR_NEXT_STAGE": 4,
# "RC_CONTROL": 5,
# "PROXIMITY_STOP": 6,
# "ESTOP": 7
# }
STATE_MACHINE = StateMachineHandler(force=True)
STATE_MACHINE.set_state("IDLE")
PROXIMITY_STOP_ENABLED = False
KILL_ON_DEAD_SERVICES = False
FORKLIFT = DroneInterface()
logger.info("Connected to ArduRover")
if PROXIMITY_STOP_ENABLED:
PROXIMITY_ARRAY = Proximity(STATE_MACHINE, FORKLIFT)
logger.info("Connected to Proximity array")
else:
logger.critical("*** PROXIMITY DISABLED!!! RUNNING SANS COLLISION AVOIDANCE ***")
def remap(value, maxInput, minInput, maxOutput, minOutput):
value = maxInput if value > maxInput else value
value = minInput if value < minInput else value
inputSpan = maxInput - minInput
outputSpan = maxOutput - minOutput
scaledThrust = float(value - minInput) / float(inputSpan)
return minOutput + (scaledThrust * outputSpan)
def safety_system():
logger.info('Started saferty system')
_, prev_state = STATE_MACHINE.get_state()
while True:
time.sleep(0.05)
_, curr_state = STATE_MACHINE.get_state()
is_estop = FORKLIFT.check_estop_signal()
is_rc_override = FORKLIFT.check_rc_signal()
if is_estop:
STATE_MACHINE.set_state("ESTOP")
if is_rc_override:
if curr_state != "RC_CONTROL":
logger.warning(f"***** Setting state to 'RC_CONTROL', prev state is saved as {curr_state}")
prev_state = curr_state
logger.info("Clearing override channels")
FORKLIFT.clear_override_channels()
STATE_MACHINE.set_state("RC_CONTROL")
if not is_rc_override and curr_state == "RC_CONTROL":
logger.info(f"Reverting state to {prev_state}")
STATE_MACHINE.set_state(prev_state)
compass_alive = FORKLIFT.verify_compass()
if not compass_alive and KILL_ON_DEAD_SERVICES:
logger.critical(f"Compass dead, killing...")
FORKLIFT.emergency_stop()
current_system_pid = os.getpid()
ThisSystem = psutil.Process(current_system_pid)
ThisSystem.terminate()
if PROXIMITY_STOP_ENABLED:
PROXIMITY_ARRAY.check_proximity()
def safety_watchdog():
logger.info("Started safety watchdog")
if not KILL_ON_DEAD_SERVICES:
logger.warning("##### KILL_ON_DEAD_SERVICES disabled, be VERY VERY careful when running the vehicle! #####")
while True:
time.sleep(0.5)
compass_alive = FORKLIFT.verify_compass()
if not compass_alive and KILL_ON_DEAD_SERVICES:
logger.critical(f"Compass dead, killing...")
FORKLIFT.emergency_stop()
current_system_pid = os.getpid()
ThisSystem = psutil.Process(current_system_pid)
ThisSystem.terminate()
if PROXIMITY_STOP_ENABLED:
proximity_alive = PROXIMITY_ARRAY.alive
if not proximity_alive and KILL_ON_DEAD_SERVICES:
logger.critical(f"Proximity array dead, killing...")
FORKLIFT.emergency_stop()
current_system_pid = os.getpid()
ThisSystem = psutil.Process(current_system_pid)
ThisSystem.terminate()
safety_system_thread = threading.Thread(target=safety_system)
safety_system_thread.start()
watchdog_thread = threading.Thread(target=safety_watchdog)
watchdog_thread.start()
app = flask.Flask(__name__)
@app.route('/', methods=['GET'])
def home():
return "
DroneKit Wrapper server is running...
"
@app.route('/alive', methods=['POST', 'GET'])
def alive():
logger.info(f"Returning alive")
return jsonify({'Success': True})
@app.route('/arm', methods=['POST'])
def arm():
logger.info(f"Arming forklift")
FORKLIFT.arm_vehicle()
return jsonify({'Success': True})
@app.route('/disarm', methods=['POST'])
def disarm():
logger.info(f"Disarming forklift")
FORKLIFT.disarm_vehicle()
return jsonify({'Success': True})
@app.route('/set_speed', methods=['POST'])
def set_speed():
if request.is_json:
content = request.get_json()
speed = content.get('speed', 0)
logger.info(f"Setting speed: {speed}")
FORKLIFT.set_vehicle_speed(speed)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/set_steering', methods=['POST'])
def set_steering():
if request.is_json:
content = request.get_json()
steering = content.get('steering', 0)
logger.info(f"Setting steering: {steering}")
FORKLIFT.set_vehicle_steering(steering)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/nudge_fork_vert', methods=['POST'])
def nudge_fork_vert():
if request.is_json:
content = request.get_json()
go_up = content.get('go_up', "nil")
amount = content.get('amount', "nil")
assert go_up != "nil" and amount != "nil", f"Params missing: go_up: {go_up}, amount: {amount}" # NOTE: I am sure there is a more elegant way to do this but I am running low on time so here this will live for now...
logger.info(f"Nudging fork {'UP' if go_up == True else 'DOWN'} for {amount} seconds")
FORKLIFT.nudge_fork_vert(nudge_up=go_up, nudge_amount=amount)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/nudge_fork_horiz', methods=['POST'])
def nudge_fork_horiz():
if request.is_json:
content = request.get_json()
go_left = content.get('go_left', "nil")
amount = content.get('amount', "nil")
assert go_left != "nil" and amount != "nil", f"Params missing: go_left: {go_left}, amount: {amount}" # NOTE: I am sure there is a more elegant way to do this but I am running low on time so here this will live for now...
logger.info(f"Nudging fork {'UP' if go_left == True else 'DOWN'} for {amount} seconds")
FORKLIFT.nudge_fork_horiz(nudge_left=go_left, nudge_amount=amount)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/set_speed_steering', methods=['POST'])
def set_speed_and_steering():
if request.is_json:
content = request.get_json()
speed = content.get('speed', 0)
steering = content.get('steering', 0)
logger.info(f"Setting speed: {speed} and steering {steering}")
FORKLIFT.set_vehicle_steering_and_speed(steering, speed)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/nudge_drive', methods=['POST'])
def nudge_drive():
if request.is_json:
content = request.get_json()
go_forward = content.get('go_forward', "nil")
amount = content.get('amount', "nil")
steering = int(content.get('steering', 1500))
assert go_forward != "nil" and amount != "nil", f"Params missing: go_forward: {go_forward}, amount: {amount}" # NOTE: I am sure there is a more elegant way to do this but I am running low on time so here this will live for now...
logger.info(f"Nudging fork {'UP' if go_forward == True else 'DOWN'} for {amount} seconds")
FORKLIFT.nudge_drive(nudge_forward=go_forward, nudge_amount=amount, nudge_steering=steering)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/go_to_gps_old', methods=['POST'])
def go_to_gps_old():
if request.is_json:
content = request.get_json()
lat = content.get('lat', 0)
lon = content.get('lon', 0)
assert lat != 0 and lon != 0, "Coordinates missing!"
logger.info(f"Navigating to lat: {lat}, lon: {lon}")
FORKLIFT.go_to_gps(lat=lat, lon=lon)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/go_to_gps', methods=['POST'])
def go_to_gps():
if request.is_json:
content = request.get_json()
lat = content.get('lat', 0)
lon = content.get('lon', 0)
assert lat != 0 and lon != 0, "Coordinates missing!"
logger.info(f"Navigating to lat: {lat}, lon: {lon}")
FORKLIFT.go_to_gps(lat=lat, lon=lon)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/go_to_heading', methods=['POST'])
def go_to_heading():
if request.is_json:
content = request.get_json()
target_heading = content.get('pickup_heading', -1)
assert target_heading >= 0, "Heading missing!"
logger.info(f"Navigating to heading: {target_heading}")
FORKLIFT.go_to_heading(target_heading)
return jsonify({'Success': True})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/get_state', methods=['POST', 'GET'])
def get_state():
_, state = STATE_MACHINE.get_state()
return jsonify({'Success': True, "State": state})
@app.route('/set_state', methods=['POST'])
def set_state():
if request.is_json:
content = request.get_json()
target_state = content.get('State', 0)
if target_state == 0:
return jsonify({'Error': f"State {target_state} not valid!"})
else:
logger.info(f"Setting machine state to {target_state}")
success = STATE_MACHINE.set_state(target_state)
if success:
return jsonify({'Success': True})
else:
return jsonify({'Error': 'State machine error, check logs...'})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/get_gps', methods=['POST', 'GET'])
def get_gps():
if request.is_json:
lat, lon = FORKLIFT.get_gps()
logger.info(f"GPS coords are: lat: {lat} lon: {lon}")
return jsonify({'Success': True, 'lat': lat, 'lon': lon})
else:
return jsonify({'Error': 'Not JSON'})
@app.route('/estop', methods=['POST'])
def estop():
FORKLIFT.emergency_stop()
return jsonify({'Success': True, 'estop': 'engaged'})
@app.route('/get_estop', methods=['POST', 'GET'])
def get_estop():
estop = FORKLIFT.estop_engaged
return jsonify({'Success': True, 'estop': estop})
@app.route('/get_arm', methods=['POST', 'GET'])
def get_arm():
armed = FORKLIFT.get_arm()
return jsonify({'Success': True, 'armed': armed})
@app.route('/get_speed', methods=['POST', 'GET'])
def get_speed():
speed = FORKLIFT.get_speed()
remapped_speed = remap(speed, 2000, 1000, 100, -100)
return jsonify({'Success': True, 'speed': remapped_speed})
@app.route('/get_steering', methods=['POST', 'GET'])
def get_steering():
steering = FORKLIFT.get_steering()
remapped_steering = remap(steering, 2000, 1000, 100, -100)
return jsonify({'Success': True, 'steering': remapped_steering})
@app.route('/approach_pallet', methods=['POST'])
def approach_pallet():
FORKLIFT.approach_pallet()
return jsonify({'Success': True})
@app.route('/reverse_from_pallet', methods=['POST'])
def reverse_from_pallet():
FORKLIFT.reverse_from_pallet()
return jsonify({'Success': True})
# @app.route('/close', methods=['POST'])
# def close_forklift():
# logger.info(f"Closing forklift")
# FORKLIFT.vehicle.close()
# return jsonify({'Success': True})
app.run(host='0.0.0.0', port=5000)