import os import logging class StateMachineHandler: def __init__(self, force=False) -> None: self.log = logging.getLogger("STATE MACHINE") self.log.setLevel("DEBUG") # self.state_machine_file = "/Users/max/Desktop/THEA_CODE/Control/Jetson/tests/MACHINE_STATE" self.state_machine_file = "/home/pi/MACHINE_STATE" self.__MACHINE_STATES = { "IDLE": 0, "NAVIGATING_TO_POINT_GPS": 1, "NAVIGATING_TO_POINT_VISION": 2, "MANIPULATING_CARGO": 3, "READY_FOR_NEXT_TASK": 4, "RC_CONTROL": 5, "PROXIMITY_STOP_FRONT": 6, "PROXIMITY_STOP_REAR": 7, "ESTOP": 8, "OTHER_ERROR": 9 } self.init_file(force=force) def get_state(self): if os.path.isfile(self.state_machine_file): with open(self.state_machine_file, 'r') as smfile: state = int(smfile.read()) return state, list(self.__MACHINE_STATES.keys())[list(self.__MACHINE_STATES.values()).index(int(state))] def init_file(self, force=False): if not os.path.isfile(self.state_machine_file) or force: self.log.info(f"Creating new state file, force flag is {force}") with open(self.state_machine_file, 'w+') as smfile: smfile.write(str(self.__MACHINE_STATES["IDLE"])) else: self.log.info("State file already exists and force flag is absent, leaving file as is") def set_state(self, state): if self.check_set_state_validity(): with open(self.state_machine_file, 'w') as smfile: smfile.write(str(self.__MACHINE_STATES.get(state, 8))) return True return False def check_set_state_validity(self): _, state = self.get_state() if state in ["ESTOP", "OTHER_ERROR"]: self.log.critical(f"Machine state is {state}! Cannot perform operation") return False return True