new_thea/Control/Pi/ICM20948.py
2021-09-21 12:11:46 +01:00

211 lines
7.6 KiB
Python

import math
import time
from dataclasses import dataclass
from threading import Thread
import logging
# import matplotlib.pyplot as plt
import numpy as np
import serial
logging.basicConfig()
logging.root.setLevel(logging.NOTSET)
logging.basicConfig(level=logging.NOTSET)
@dataclass
class CompassOffsets:
min_x: float
max_x: float
min_y: float
max_y: float
hard_offset_x: float
hard_offset_y: float
pre_calibrated_values = CompassOffsets(0, 0, 0, 0, 0, 0) #-17.25, 16.95, -43.8, -13.95, 0.15000000000000036, 28.875 fl outside
class ICM20948DataStream:
def __init__(self, com_port="/dev/cu.usbmodem1462201", baudrate=115200) -> None:
"""
Other com port: /dev/cu.SLAB_USBtoUART
"""
self.serial = serial.Serial(com_port, baudrate=baudrate)
init_message = str(self.serial.readline())
assert 'I2C begin Failed' not in init_message, "No I2C sensor detected!"
self.mag = {"x": 0, "y": 0, "z": 0}
self.running = False
self.log = logging.getLogger("COMPASS DATA STREAM")
self.log.setLevel(logging.DEBUG)
time.sleep(1)
def start(self):
t = Thread(target=self.update, args=())
t.daemon = True
self.running = True
t.start()
return self
def update(self):
while self.running:
try:
ser_bytes = str(self.serial.readline())
useful = ser_bytes.split('\\')[0].split('\'')[1]
mag_data_raw = useful.split(';')
mag_raw_x = float(mag_data_raw[0])
mag_raw_y = float(mag_data_raw[1])
mag_raw_z = float(mag_data_raw[2])
self.mag = {"x": mag_raw_x, "y": mag_raw_y, "z": mag_raw_z}
except Exception as e:
self.log.error(f"Error: {e}")
self.log.error(f"Got bytes: {ser_bytes}")
def read(self):
return self.mag
def stop(self):
self.running = False
class ICM20948:
def __init__(self, sensor, pre_calibreted=None):
self.sensor = sensor
self.offsets = CompassOffsets(0, 0, 0, 0, 0, 0)
self.calibrated = False
if pre_calibreted:
self.calibrated = True
self.offsets = pre_calibreted
self.__permanent_offset = 0
self.history = np.zeros(15)
self.current_heading = 0
self.previous_heading = 0
self.running = False
self.log = logging.getLogger("MAIN COMPASS CLASS")
self.log.setLevel(logging.DEBUG)
def record(self, rec_time=30):
end_time = time.time() + rec_time
readings_x = []
readings_y = []
self.log.info("Start recording")
while time.time() < end_time:
time.sleep(0.01)
new_reading = self.sensor.read()
if new_reading['x'] == 0 or new_reading['y'] == 0:
self.log.warning("Sensor warning!")
else:
readings_x.append(new_reading['x'])
readings_y.append(new_reading['y'])
self.log.info("Done")
return readings_x, readings_y
def set_offset(self, offset):
self.__permanent_offset = offset
def calibrate(self, calib_time=30):
raw_x, raw_y = self.record(calib_time)
min_x, max_x, min_y, max_y = self.clean_calib_data(raw_x, raw_y)
hard_offset_x = -(min_x + max_x) / 2
hard_offset_y = -(min_y + max_y) / 2
self.offsets.min_x = min_x
self.offsets.max_x = max_x
self.offsets.min_y = min_y
self.offsets.max_y = max_y
self.offsets.hard_offset_x = hard_offset_x
self.offsets.hard_offset_y = hard_offset_y
self.log.info(f"Calibration complete, Values: min_x: {min_x}, max_x: {max_x}, min_y: {min_y}, max_y: {max_y}, hard_offset_x: {hard_offset_x}, hard_offset_y: {hard_offset_y}")
self.log.info(f"Copyable calib params: {min_x}, {max_x}, {min_y}, {max_y}, {hard_offset_x}, {hard_offset_y}")
self.calibrated = True
self.previous_heading = 0
def clean_calib_data(self, x, y, max_deviations=2):
x_np = np.asarray(x)
y_np = np.asarray(y)
# Clean x axis outlier pairs
x_std = np.std(x_np)
x_mean = np.mean(x_np)
x_distance_from_mean = abs(x_np - x_mean)
x_not_outlier = x_distance_from_mean < max_deviations * x_std
x_outlier = x_distance_from_mean > max_deviations * x_std
x_np_clean = x_np[x_not_outlier]
y_np_clean = y_np[x_not_outlier]
# Clean y axis outlier pairs
y_std = np.std(y_np_clean)
y_mean = np.mean(y_np_clean)
y_distance_from_mean = abs(y_np_clean - y_mean)
y_not_outlier = y_distance_from_mean < max_deviations * y_std
y_outlier = y_distance_from_mean > max_deviations * y_std
y_np_clean = y_np_clean[y_not_outlier]
x_np_clean = x_np_clean[y_not_outlier]
assert len(x_np_clean) == len(y_np_clean)
self.log.info(f"Total outliers: {len(x_np) - len(x_np_clean)}")
self.log.info(f"Original Min x: {min(x)}, Max x: {max(x)}, Min y: {min(y)}, Max y:{max(y)}")
self.log.info(f"Clean Min x: {min(x_np_clean)}, Max x: {max(x_np_clean)}, Min y: {min(y_np_clean)}, Max y: {max(y_np_clean)}")
return min(x_np_clean), max(x_np_clean), min(y_np_clean), max(y_np_clean)
def raw_2_correct(self, x, y):
assert self.calibrated
# hard_corrected_x = x + self.offsets.hard_offset_x
# hard_corrected_y = y + self.offsets.hard_offset_y
fully_corrected_x = (x - self.offsets.min_x) / (self.offsets.max_x - self.offsets.min_x) * 2 - 1
fully_corrected_y = (y - self.offsets.min_y) / (self.offsets.max_y - self.offsets.min_y) * 2 - 1
return fully_corrected_x, fully_corrected_y
# return hard_corrected_x, hard_corrected_y
def calculate_immediate_heading(self):
heading, _ = self.get_heading_and_data()
return heading
def get_heading_and_data(self):
new_reading = self.sensor.read()
if new_reading['x'] == 0 or new_reading['y'] == 0:
self.log.warning("Sensor warning!")
return self.previous_heading, (0, 0)
else:
corrected_x, corrected_y = self.raw_2_correct(new_reading['x'], new_reading['y'])
heading = -(180 * math.atan2(corrected_y, corrected_x) / math.pi)
heading += self.__permanent_offset
if heading < 0:
heading += 360
if heading > 360:
heading -= 360
self.previous_heading = heading
return heading, (corrected_x, corrected_y)
def get_smooth_heading(self):
new_heading = self.calculate_immediate_heading()
self.history = np.roll(self.history, -1)
self.history[-1] = new_heading
x = y = 0
for angle in self.history:
x += math.cos(math.radians(angle))
y += math.sin(math.radians(angle))
average_angle_rad = math.atan2(y, x)
average_angle = 180 * average_angle_rad / math.pi
if average_angle < 0:
average_angle += 360
if average_angle > 360:
average_angle -= 360
# print(f"Avg: {average_angle}, Hist: {self.history}, x: {x}, y: {y}")
return average_angle
def update(self):
while self.running:
self.current_heading = self.get_smooth_heading()
time.sleep(0.05)
def read(self):
return self.current_heading
def stop(self):
self.running = False
def start(self):
t2 = Thread(target=self.update, args=())
t2.daemon = True
self.running = True
t2.start()
return self