import math import time from dataclasses import dataclass from threading import Thread # import matplotlib.pyplot as plt import numpy as np import serial @dataclass class CompassOffsets: min_x: float max_x: float min_y: float max_y: float hard_offset_x: float hard_offset_y: float class ICM20948DataStream: def __init__(self, com_port="/dev/cu.usbmodem1462201", baudrate=115200) -> None: """ Other com port: /dev/cu.SLAB_USBtoUART """ self.serial = serial.Serial(com_port, baudrate=baudrate) init_message = str(self.serial.readline()) assert 'I2C begin Failed' not in init_message, "No I2C sensor detected!" self.mag = {"x": 0, "y": 0, "z": 0} self.running = False time.sleep(1) def start(self): t = Thread(target=self.update, args=()) t.daemon = True self.running = True t.start() return self def update(self): while self.running: try: ser_bytes = str(self.serial.readline()) useful = ser_bytes.split('\\')[0].split('\'')[1] mag_data_raw = useful.split(';') mag_raw_x = float(mag_data_raw[0]) mag_raw_y = float(mag_data_raw[1]) mag_raw_z = float(mag_data_raw[2]) self.mag = {"x": mag_raw_x, "y": mag_raw_y, "z": mag_raw_z} except Exception as e: print(f"Error: {e}") def read(self): return self.mag def stop(self): self.running = False class ICM20948: def __init__(self, sensor): self.sensor = sensor self.offsets = CompassOffsets(0, 0, 0, 0, 0, 0) self.calibrated = False self.__permanent_offset = 0 self.history = np.zeros(10) def record(self, rec_time=30): end_time = time.time() + rec_time readings_x = [] readings_y = [] print("Start recording") while time.time() < end_time: time.sleep(0.01) new_reading = self.sensor.read() if new_reading['x'] == 0 or new_reading['y'] == 0: print("Sensor warning!") else: readings_x.append(new_reading['x']) readings_y.append(new_reading['y']) print("Done") return readings_x, readings_y def set_offset(self, offset): self.__permanent_offset = offset def calibrate(self): raw_x, raw_y = self.record(30) min_x, max_x, min_y, max_y = self.clean_calib_data(raw_x, raw_y) hard_offset_x = -(min_x + max_x) / 2 hard_offset_y = -(min_y + max_y) / 2 self.offsets.min_x = min_x self.offsets.max_x = max_x self.offsets.min_y = min_y self.offsets.max_y = max_y self.offsets.hard_offset_x = hard_offset_x self.offsets.hard_offset_y = hard_offset_y print(f"Calibration complete, Values: min_x: {min_x}, max_x: {max_x}, min_y: {min_y}, max_y: {max_y}, hard_offset_x: {hard_offset_x}, hard_offset_y: {hard_offset_y}") self.calibrated = True self.previous_heading = 0 def clean_calib_data(self, x, y, max_deviations=2): x_np = np.asarray(x) y_np = np.asarray(y) # Clean x axis outlier pairs x_std = np.std(x_np) x_mean = np.mean(x_np) x_distance_from_mean = abs(x_np - x_mean) x_not_outlier = x_distance_from_mean < max_deviations * x_std x_outlier = x_distance_from_mean > max_deviations * x_std x_np_clean = x_np[x_not_outlier] y_np_clean = y_np[x_not_outlier] # Clean y axis outlier pairs y_std = np.std(y_np_clean) y_mean = np.mean(y_np_clean) y_distance_from_mean = abs(y_np_clean - y_mean) y_not_outlier = y_distance_from_mean < max_deviations * y_std y_outlier = y_distance_from_mean > max_deviations * y_std y_np_clean = y_np_clean[y_not_outlier] x_np_clean = x_np_clean[y_not_outlier] assert len(x_np_clean) == len(y_np_clean) print(f"Total outliers: {len(x_np) - len(x_np_clean)}") print(f"Original Min x: {min(x)}, Max x: {max(x)}, Min y: {min(y)}, Max y:{max(y)}") print(f"Clean Min x: {min(x_np_clean)}, Max x: {max(x_np_clean)}, Min y: {min(y_np_clean)}, Max y: {max(y_np_clean)}") return min(x_np_clean), max(x_np_clean), min(y_np_clean), max(y_np_clean) def raw_2_correct(self, x, y): assert self.calibrated # hard_corrected_x = x + self.offsets.hard_offset_x # hard_corrected_y = y + self.offsets.hard_offset_y fully_corrected_x = (x - self.offsets.min_x) / (self.offsets.max_x - self.offsets.min_x) * 2 - 1 fully_corrected_y = (y - self.offsets.min_y) / (self.offsets.max_y - self.offsets.min_y) * 2 - 1 return fully_corrected_x, fully_corrected_y # return hard_corrected_x, hard_corrected_y def calculate_immediate_heading(self): heading, _ = self.get_heading_and_data() return heading def get_heading_and_data(self): new_reading = self.sensor.read() if new_reading['x'] == 0 or new_reading['y'] == 0: print("Sensor warning!") return self.previous_heading, (0, 0) else: corrected_x, corrected_y = self.raw_2_correct(new_reading['x'], new_reading['y']) heading = -(180 * math.atan2(corrected_y, corrected_x) / math.pi) heading += self.__permanent_offset if heading < 0: heading += 360 if heading > 360: heading -= 360 self.previous_heading = heading return heading, (corrected_x, corrected_y) def get_smooth_heading(self): new_heading = self.calculate_immediate_heading() self.history = np.roll(self.history, -1) self.history[-1] = new_heading x = y = 0 for angle in self.history: x += math.cos(math.radians(angle)) y += math.sin(math.radians(angle)) average_angle_rad = math.atan2(y, x) average_angle = 180 * average_angle_rad / math.pi if average_angle < 0: average_angle += 360 if average_angle > 360: average_angle -= 360 # print(f"Avg: {average_angle}, Hist: {self.history}, x: {x}, y: {y}") return average_angle