#!/usr/local/bin/python import board import busio import digitalio import time import logging import adafruit_rfm69 from adafruit_seesaw.seesaw import Seesaw class LoRa: __SIGNAL_FREQUENCY = 915.0 __ENCRYPTION_KEY = b"\x01\x01\x01\x01\x01\x01\x01\x01\x02\x02\x02\x02\x02\x02\x02\x02" __PINS = { 'miso': board.MISO, 'mosi': board.MOSI, 'sck': board.SCK, 'cs': board.D22, 'rst': board.D27 } __PACKET_WAIT = 2 def __init__(self): spi = busio.SPI(self.__PINS['sck'], MOSI=self.__PINS['mosi'], MISO=self.__PINS['miso']) cs = digitalio.DigitalInOut(self.__PINS['cs']) rst = digitalio.DigitalInOut(self.__PINS['rst']) self.lora = adafruit_rfm69.RFM69(spi, cs, rst, self.__SIGNAL_FREQUENCY) self.lora.encryption_key = self.__ENCRYPTION_KEY logging.debug(f'Init\'d LoRa board with freq: {self.lora.frequency_mhz}, bitrate: {self.lora.bitrate / 1000} kbit/s, f. deviation: {self.lora.frequency_deviation/1000} khz, and encryption key: {self.lora.encryption_key}') def receive_message(self): packets = self.lora.receive(timeout=self.__PACKET_WAIT) if packets: logging.debug(f'Got packets: {packets}') return packets return False def send_message(self, message): logging.debug(f'Sending message: {message}') packets = bytes(message, 'utf-8') self.lora.send(packets) class WateringPump: __PINS = {'pwm': board.D20} __FLOW_RATE = 130 def __init__(self): self.ctrl_pin = self.__PINS['pwm'] self.pump = digitalio.DigitalInOut(self.ctrl_pin) self.pump.direction = digitalio.Direction.OUTPUT self.pump.value = False logging.debug(f'Init\'d pump at pin {self.ctrl_pin}, Power state is {self.pump.value}') def dispense(self, volume: int): ''' Pump flow rate is 130 ml/min at 5V need to convert vol in ml to time duration in s target_vol*60/130 = pump dispensing duration in seconds ''' flow_rate = self.__FLOW_RATE run_duration_seconds = volume*60/flow_rate self.__run_for(run_duration_seconds) def __run_for(self, duration: float): start_time = time.time() end_time = start_time + duration self.pump.value = True logging.info('Starting pump') while time.time() < end_time: pass self.pump.value = False logging.info('Stopping pump') class SoliSensor: __PINS = { 'sda': board.SDA, 'scl': board.SCL } def __init__(self): i2c = busio.I2C(self.__PINS['scl'], self.__PINS['sda']) self.sensor = Seesaw(i2c, addr=0x36) logging.debug(f'Init\'d soil sensor, humidity: {self.sensor.moisture_read()}, temp: {self.sensor.get_temp()}') def get_temp(self): try: temp = self.sensor.get_temp() logging.debug(f'Temp: {temp}') return temp except Exception: return -50 def get_hmdt(self): try: hmdt = self.sensor.moisture_read() logging.debug(f'Humidity: {hmdt}') return hmdt except Exception: return -50 class SensorNode: def __init__(self): self.com = LoRa() self.probe = SoliSensor() self.pump = WateringPump() def wait_for_instructions(self): self.__instrucitons = { 'ping': self.ping, 'iot_g_temp': self.get_soil_temp, 'iot_g_hmdt': self.get_soil_hmdt, 'iot_pmp_ctrl': self.pump_control } while True: command = self.com.receive_message() if command: command = command.decode() logging.debug(f'Decoded command: {command}') commands = str(command).split('|') logging.debug(f'Processed commands: {commands}') try: self.__instrucitons[commands[0]](commands[1]) except Exception as e: logging.error(f'Exception: {e}') def ping(self, *_): time.sleep(0.5) self.com.send_message('OK') def get_soil_temp(self, *_): soil_temp = self.probe.get_temp() time.sleep(0.5) self.com.send_message(str(soil_temp)) def get_soil_hmdt(self, *_): soil_hmdt = self.probe.get_hmdt() time.sleep(0.5) self.com.send_message(str(soil_hmdt)) def pump_control(self, volume): water_qty = int(volume) logging.info(f'Dispensing {water_qty} ml.') time.sleep(0.5) self.com.send_message('OK') self.pump.dispense(water_qty) if __name__ == "__main__": logging.root.setLevel(logging.DEBUG) sensor_node = SensorNode() sensor_node.wait_for_instructions()