import logging import board from dataclasses import dataclass, field from typing import List import zmq import threading from apscheduler.schedulers.background import BackgroundScheduler import RPi.GPIO as IO from ProximitySensor import ProximitySensor from IMU import IMU from GPS import GPSPimori from CAN import CANBus from Power import Battery import inspect import ctypes import time logging.basicConfig(level='WARNING') example_task_list = { 1: { "start": True, "waypoints": [(1, 1), (2, 2), (3, 3)], "handoff": True, "waypoints": [(3, 3), (2, 2), (1, 1)], "end": True }, 2: { "start": True, "waypoints": [(1.5, 2.5), (5, 5), (11.3, 10.1)], "handoff": True, "waypoints": [(11.3, 10.1), (5, 5), (1.5, 2.5)], "end": True } } @dataclass class Gps_telemetry: lat: float = None lon: float = None alt: float = None sats: int = None has_fix: bool = False speed_over_ground: float = None @dataclass class Imu_telemetry: quat: List[float] = field(default_factory=list) # [0, 0, 0, 0] grav: List[float] = field(default_factory=list) # [0, 0, 0] magnetic: List[float] = field(default_factory=list) # [0, 0, 0] accel: List[float] = field(default_factory=list) # [0, 0, 0] euler: List[float] = field(default_factory=list) # [0, 0, 0] @dataclass class Trajectory: velocity: float = 0.0 heading: float = None @dataclass class Position: lat: float = None lon: float = None heading: float = None class Vehicle: __PROXIMTY_THRESHOLD = 30 def __init__(self) -> None: self.name = 'Vehicle' self.can = CANBus(channel='can0') self.gps = GPSPimori() self.imu = IMU() self.proximity_sensors = [ ProximitySensor(echo_pin=board.D21, trigger_pin=board.D20), ProximitySensor(echo_pin=board.D16, trigger_pin=board.D12), ] self.battery = Battery() self.ALLOW_REMOTE_CONTROL = False self.CONTROL_SERVER_RUNNING = True self.status_gps = Gps_telemetry() self.status_imu = Imu_telemetry() self.status_imu.quat = [None, None, None, None] self.status_imu.grav = [None, None, None] self.status_imu.magnetic = [None, None, None] self.status_imu.accel = [None, None, None] self.status_imu.euler = [None, None, None] self.battery_low = None self.status_job = None #TODO: this self.distance_proximity = [None, None] self.status_proximity = [None, None] # Turns true if we too close to object self.trajectory = Trajectory() self.position = Position() context = zmq.Context() self.socket = context.socket(zmq.REP) self.socket.bind("tcp://*:5555") self.update_all_telemetry() self.setup_scheduler() def setup_scheduler(self) -> None: self.update_scheduler = BackgroundScheduler() self.update_scheduler.add_job(self.update_imu, 'interval', seconds=0.5, id="imu_updater") self.update_scheduler.add_job(self.update_proximity, 'interval', seconds=0.5, id="proximity_updater") self.update_scheduler.add_job(self.update_gps, 'interval', seconds=3, id="gps_updater") self.update_scheduler.add_job(self.update_battery, 'interval', seconds=5, id="battery_updater") def start_scheduler(self) -> None: self.update_scheduler.start() def stop_all_telemetry_updates(self) -> None: self.update_scheduler.remove_all_jobs() def stop_telemetry_updates(self, id: str) -> None: self.update_scheduler.remove_job(id) def shutdown_telemetry_updator(self) -> None: self.update_scheduler.shutdown() def auto_telemetry_updater(self) -> None: pass def update_all_telemetry(self) -> None: #NOTE: threaded subprocesses should take care of updating stuff (at different freq), we only take the local vars when passing them to client self.update_gps() self.update_imu() self.update_proximity() self.update_battery() def update_gps(self) -> None: gps_data = self.gps.get_gps_data() self.status_gps.lat = gps_data['location']['lat'] self.status_gps.lon = gps_data['location']['lon'] self.status_gps.alt = gps_data['location']['alt'] self.status_gps.sats = gps_data['diags']['satellites'] self.status_gps.speed_over_ground = gps_data['location']['velocity'] self.position.lon = self.status_gps.lon self.position.lat = self.status_gps.lat self.trajectory.velocity = self.status_gps.speed_over_ground def update_imu(self) -> None: try: quaternions = self.imu.get_quaternion() for i in range(4): self.status_imu.quat[i] = quaternions[i] gravity = self.imu.get_gravity() magnetic = self.imu.get_magnetometer() lin_acc = self.imu.get_linear_acceleration() euler = self.imu.get_euler() for i in range(3): self.status_imu.grav[i] = gravity[i] self.status_imu.magnetic[i] = magnetic[i] self.status_imu.accel[i] = lin_acc[i] self.status_imu.euler[i] = euler[i] self.position.heading = self.status_imu.euler[0] self.trajectory.heading = self.status_imu.euler[0] except Exception as e: logging.error(f'IMU Error: {e}') def update_proximity(self) -> None: for ix, sensor in enumerate(self.proximity_sensors): self.distance_proximity[ix] = sensor.get_distance() self.status_proximity[ix] = (self.distance_proximity[ix] > self.__PROXIMTY_THRESHOLD) def update_battery(self) -> None: self.battery_low = self.battery.check_battery() if self.battery_low: logging.warning("Battery is low!") def task_processor(self): pass #TODO: add a func that will iterate over a task list and perform waypoint following and teleop switching def drive(self, velocity: float) -> None: self.trajectory.velocity = velocity pass #TODO: add CAN command to start driving forward, and stop if v = 0 def turn(self, amount: float) -> None: pass #TODO: add CAN command to move steering, and go straight if a = 0 def teleop_communicator_loop(self) -> None: ''' Example messages: { 'intent': 'control|update', 'domain': 'telemetry|drive|forklift', 'command': 'fwd|back|turn|update|TBD', 'amount': '' } ''' while True: # if not stop_signal(): # logging.warning("Killing com server") # return try: new_message = self.socket.recv_json() except Exception as e: logging.error(f"Encountered internal socket error: {e}") self.socket.send_json({"error": None}) new_message = self.socket.recv_json() logging.info(f"Got message: {new_message}") message_intent = new_message.get('intent', 'nothing') if message_intent == 'nothing': logging.critical(f"Got an invalid message: {new_message}") elif message_intent == 'control': pass # Control subroutine, TODO elif message_intent == 'update': response_msg = { 'error': False, 'intent': 'update', 'telemetry': { 'gps': { 'lat': self.status_gps.lat, 'lon': self.status_gps.lon, 'alt': self.status_gps.alt, 'sats': self.status_gps.sats, 'has_fix': self.status_gps.has_fix }, 'traj': { 'vel': self.trajectory.velocity, 'heading': self.trajectory.heading }, 'prox_1': self.distance_proximity[0], 'prox_2': self.distance_proximity[1], 'prox_3': '?', 'prox_4': '?', 'job_status': None, 'battery_low': self.battery_low } } self.socket.send_json(response_msg) else: logging.critical(f"Got an unexpected intent: {message_intent} | {new_message}") def start_teleop_com_server(self) -> None: self.teleop_server = threading.Thread(target=self.teleop_communicator_loop, args=(), name="Com_server") self.teleop_server.start() def execute_safe_shutdown(self) -> None: self.shutdown_telemetry_updator() # self.teleop_server.terminate() # NOTE: I have xero bloody idea as to how we kill the thread while it is blocking with zmq recv command, and we cannot use miltiprocessing bc that seems to break everythong :( # self.socket.c # TODO: cleanup sockets IO.cleanup() def stop(self) -> None: self.execute_safe_shutdown() logging.info("Clean shutsodn complete, exiting script.") exit(0) if __name__ == "__main__": vehicle = Vehicle() vehicle.start_scheduler() vehicle.start_teleop_com_server() s = vehicle.stop