211 lines
7.6 KiB
Python
211 lines
7.6 KiB
Python
import math
|
|
import time
|
|
from dataclasses import dataclass
|
|
from threading import Thread
|
|
import logging
|
|
|
|
# import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import serial
|
|
|
|
logging.basicConfig()
|
|
logging.root.setLevel(logging.NOTSET)
|
|
logging.basicConfig(level=logging.NOTSET)
|
|
|
|
@dataclass
|
|
class CompassOffsets:
|
|
min_x: float
|
|
max_x: float
|
|
min_y: float
|
|
max_y: float
|
|
hard_offset_x: float
|
|
hard_offset_y: float
|
|
|
|
|
|
pre_calibrated_values = CompassOffsets(0, 0, 0, 0, 0, 0) #-17.25, 16.95, -43.8, -13.95, 0.15000000000000036, 28.875 fl outside
|
|
|
|
|
|
class ICM20948DataStream:
|
|
def __init__(self, com_port="/dev/cu.usbmodem1462201", baudrate=115200) -> None:
|
|
"""
|
|
Other com port: /dev/cu.SLAB_USBtoUART
|
|
"""
|
|
self.serial = serial.Serial(com_port, baudrate=baudrate)
|
|
init_message = str(self.serial.readline())
|
|
assert 'I2C begin Failed' not in init_message, "No I2C sensor detected!"
|
|
self.mag = {"x": 0, "y": 0, "z": 0}
|
|
self.running = False
|
|
self.log = logging.getLogger("COMPASS DATA STREAM")
|
|
self.log.setLevel(logging.DEBUG)
|
|
time.sleep(1)
|
|
|
|
def start(self):
|
|
t = Thread(target=self.update, args=())
|
|
t.daemon = True
|
|
self.running = True
|
|
t.start()
|
|
return self
|
|
|
|
def update(self):
|
|
while self.running:
|
|
try:
|
|
ser_bytes = str(self.serial.readline())
|
|
useful = ser_bytes.split('\\')[0].split('\'')[1]
|
|
mag_data_raw = useful.split(';')
|
|
mag_raw_x = float(mag_data_raw[0])
|
|
mag_raw_y = float(mag_data_raw[1])
|
|
mag_raw_z = float(mag_data_raw[2])
|
|
self.mag = {"x": mag_raw_x, "y": mag_raw_y, "z": mag_raw_z}
|
|
except Exception as e:
|
|
self.log.error(f"Error: {e}")
|
|
self.log.error(f"Got bytes: {ser_bytes}")
|
|
|
|
def read(self):
|
|
return self.mag
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
|
|
class ICM20948:
|
|
def __init__(self, sensor, pre_calibreted=None):
|
|
self.sensor = sensor
|
|
self.offsets = CompassOffsets(0, 0, 0, 0, 0, 0)
|
|
self.calibrated = False
|
|
if pre_calibreted:
|
|
self.calibrated = True
|
|
self.offsets = pre_calibreted
|
|
self.__permanent_offset = 0
|
|
self.history = np.zeros(15)
|
|
self.current_heading = 0
|
|
self.previous_heading = 0
|
|
self.running = False
|
|
self.log = logging.getLogger("MAIN COMPASS CLASS")
|
|
self.log.setLevel(logging.DEBUG)
|
|
|
|
def record(self, rec_time=30):
|
|
end_time = time.time() + rec_time
|
|
readings_x = []
|
|
readings_y = []
|
|
self.log.info("Start recording")
|
|
while time.time() < end_time:
|
|
time.sleep(0.01)
|
|
new_reading = self.sensor.read()
|
|
if new_reading['x'] == 0 or new_reading['y'] == 0:
|
|
self.log.warning("Sensor warning!")
|
|
else:
|
|
readings_x.append(new_reading['x'])
|
|
readings_y.append(new_reading['y'])
|
|
self.log.info("Done")
|
|
return readings_x, readings_y
|
|
|
|
def set_offset(self, offset):
|
|
self.__permanent_offset = offset
|
|
|
|
def calibrate(self, calib_time=30):
|
|
raw_x, raw_y = self.record(calib_time)
|
|
min_x, max_x, min_y, max_y = self.clean_calib_data(raw_x, raw_y)
|
|
hard_offset_x = -(min_x + max_x) / 2
|
|
hard_offset_y = -(min_y + max_y) / 2
|
|
self.offsets.min_x = min_x
|
|
self.offsets.max_x = max_x
|
|
self.offsets.min_y = min_y
|
|
self.offsets.max_y = max_y
|
|
self.offsets.hard_offset_x = hard_offset_x
|
|
self.offsets.hard_offset_y = hard_offset_y
|
|
self.log.info(f"Calibration complete, Values: min_x: {min_x}, max_x: {max_x}, min_y: {min_y}, max_y: {max_y}, hard_offset_x: {hard_offset_x}, hard_offset_y: {hard_offset_y}")
|
|
self.log.info(f"Copyable calib params: {min_x}, {max_x}, {min_y}, {max_y}, {hard_offset_x}, {hard_offset_y}")
|
|
self.calibrated = True
|
|
self.previous_heading = 0
|
|
|
|
def clean_calib_data(self, x, y, max_deviations=2):
|
|
x_np = np.asarray(x)
|
|
y_np = np.asarray(y)
|
|
# Clean x axis outlier pairs
|
|
x_std = np.std(x_np)
|
|
x_mean = np.mean(x_np)
|
|
x_distance_from_mean = abs(x_np - x_mean)
|
|
x_not_outlier = x_distance_from_mean < max_deviations * x_std
|
|
x_outlier = x_distance_from_mean > max_deviations * x_std
|
|
x_np_clean = x_np[x_not_outlier]
|
|
y_np_clean = y_np[x_not_outlier]
|
|
# Clean y axis outlier pairs
|
|
y_std = np.std(y_np_clean)
|
|
y_mean = np.mean(y_np_clean)
|
|
y_distance_from_mean = abs(y_np_clean - y_mean)
|
|
y_not_outlier = y_distance_from_mean < max_deviations * y_std
|
|
y_outlier = y_distance_from_mean > max_deviations * y_std
|
|
y_np_clean = y_np_clean[y_not_outlier]
|
|
x_np_clean = x_np_clean[y_not_outlier]
|
|
|
|
assert len(x_np_clean) == len(y_np_clean)
|
|
self.log.info(f"Total outliers: {len(x_np) - len(x_np_clean)}")
|
|
self.log.info(f"Original Min x: {min(x)}, Max x: {max(x)}, Min y: {min(y)}, Max y:{max(y)}")
|
|
self.log.info(f"Clean Min x: {min(x_np_clean)}, Max x: {max(x_np_clean)}, Min y: {min(y_np_clean)}, Max y: {max(y_np_clean)}")
|
|
return min(x_np_clean), max(x_np_clean), min(y_np_clean), max(y_np_clean)
|
|
|
|
def raw_2_correct(self, x, y):
|
|
assert self.calibrated
|
|
# hard_corrected_x = x + self.offsets.hard_offset_x
|
|
# hard_corrected_y = y + self.offsets.hard_offset_y
|
|
|
|
fully_corrected_x = (x - self.offsets.min_x) / (self.offsets.max_x - self.offsets.min_x) * 2 - 1
|
|
fully_corrected_y = (y - self.offsets.min_y) / (self.offsets.max_y - self.offsets.min_y) * 2 - 1
|
|
return fully_corrected_x, fully_corrected_y
|
|
# return hard_corrected_x, hard_corrected_y
|
|
|
|
def calculate_immediate_heading(self):
|
|
heading, _ = self.get_heading_and_data()
|
|
return heading
|
|
|
|
def get_heading_and_data(self):
|
|
new_reading = self.sensor.read()
|
|
if new_reading['x'] == 0 or new_reading['y'] == 0:
|
|
self.log.warning("Sensor warning!")
|
|
return self.previous_heading, (0, 0)
|
|
else:
|
|
corrected_x, corrected_y = self.raw_2_correct(new_reading['x'], new_reading['y'])
|
|
heading = -(180 * math.atan2(corrected_y, corrected_x) / math.pi)
|
|
heading += self.__permanent_offset
|
|
if heading < 0:
|
|
heading += 360
|
|
if heading > 360:
|
|
heading -= 360
|
|
self.previous_heading = heading
|
|
return heading, (corrected_x, corrected_y)
|
|
|
|
def get_smooth_heading(self):
|
|
new_heading = self.calculate_immediate_heading()
|
|
self.history = np.roll(self.history, -1)
|
|
self.history[-1] = new_heading
|
|
x = y = 0
|
|
for angle in self.history:
|
|
x += math.cos(math.radians(angle))
|
|
y += math.sin(math.radians(angle))
|
|
average_angle_rad = math.atan2(y, x)
|
|
average_angle = 180 * average_angle_rad / math.pi
|
|
if average_angle < 0:
|
|
average_angle += 360
|
|
if average_angle > 360:
|
|
average_angle -= 360
|
|
# print(f"Avg: {average_angle}, Hist: {self.history}, x: {x}, y: {y}")
|
|
return average_angle
|
|
|
|
def update(self):
|
|
while self.running:
|
|
self.current_heading = self.get_smooth_heading()
|
|
time.sleep(0.05)
|
|
|
|
def read(self):
|
|
return self.current_heading
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
def start(self):
|
|
t2 = Thread(target=self.update, args=())
|
|
t2.daemon = True
|
|
self.running = True
|
|
t2.start()
|
|
return self
|