123 lines
3.5 KiB
Python
123 lines
3.5 KiB
Python
#!/usr/local/bin/python
|
|
import board
|
|
import busio
|
|
import digitalio
|
|
import time
|
|
import logging
|
|
|
|
import adafruit_rfm69
|
|
from adafruit_seesaw.seesaw import Seesaw
|
|
|
|
|
|
class LoRa:
|
|
__SIGNAL_FREQUENCY = 915.0
|
|
__ENCRYPTION_KEY = b"\x01\x01\x01\x01\x01\x01\x01\x01\x02\x02\x02\x02\x02\x02\x02\x02"
|
|
__PINS = {
|
|
'miso': board.MISO,
|
|
'mosi': board.MOSI,
|
|
'sck': board.SCK,
|
|
'cs': board.D22,
|
|
'rst': board.D27
|
|
}
|
|
__PACKET_WAIT = 2
|
|
|
|
def __init__(self):
|
|
spi = busio.SPI(self.__PINS['sck'],
|
|
MOSI=self.__PINS['mosi'],
|
|
MISO=self.__PINS['miso'])
|
|
cs = digitalio.DigitalInOut(self.__PINS['cs'])
|
|
rst = digitalio.DigitalInOut(self.__PINS['rst'])
|
|
self.lora = adafruit_rfm69.RFM69(spi, cs, rst, self.__SIGNAL_FREQUENCY)
|
|
self.lora.encryption_key = self.__ENCRYPTION_KEY
|
|
logging.debug(f'Init\'d LoRa board with freq: {self.lora.frequency_mhz}, bitrate: {self.lora.bitrate / 1000} kbit/s, f. deviation: {self.lora.frequency_deviation/1000} khz, and encryption key: {self.lora.encryption_key}')
|
|
|
|
def receive_message(self):
|
|
packets = self.lora.receive(timeout=self.__PACKET_WAIT)
|
|
if packets:
|
|
logging.debug(f'Got packets: {packets}')
|
|
return packets
|
|
return False
|
|
|
|
def send_message(self, message):
|
|
logging.debug(f'Sending message: {message}')
|
|
packets = bytes(message, 'utf-8')
|
|
self.lora.send(packets)
|
|
|
|
|
|
class WateringPump:
|
|
def __init__(self):
|
|
pass
|
|
|
|
|
|
class SoliSensor:
|
|
__PINS = {
|
|
'sda': board.SDA,
|
|
'scl': board.SCL
|
|
}
|
|
|
|
def __init__(self):
|
|
i2c = busio.I2C(self.__PINS['scl'], self.__PINS['sda'])
|
|
self.sensor = Seesaw(i2c, addr=0x36)
|
|
logging.debug(f'Init\'d soil sensor, humidity: {self.sensor.moisture_read()}, temp: {self.sensor.get_temp()}')
|
|
|
|
def get_temp(self):
|
|
temp = self.sensor.get_temp()
|
|
logging.debug(f'Temp: {temp}')
|
|
return temp
|
|
|
|
def get_hmdt(self):
|
|
hmdt = self.sensor.moisture_read()
|
|
logging.debug(f'Humidity: {hmdt}')
|
|
return hmdt
|
|
|
|
|
|
class Slave:
|
|
|
|
def __init__(self):
|
|
self.com = LoRa()
|
|
self.probe = SoliSensor()
|
|
self.pump = WateringPump()
|
|
|
|
def wait_for_instructions(self):
|
|
self.__instrucitons = {
|
|
'ping': self.ping,
|
|
'iot_g_temp': self.get_soil_temp,
|
|
'iot_g_hmdt': self.get_soil_hmdt,
|
|
'iot_pmp_ctrl': self.pump_control
|
|
}
|
|
while True:
|
|
command = self.com.receive_message()
|
|
if command:
|
|
command = command.decode()
|
|
logging.debug(f'Decoded command: {command}')
|
|
commands = str(command).split('|')
|
|
logging.debug(f'Processed commands: {commands}')
|
|
self.__instrucitons[commands[0]](commands[1])
|
|
|
|
def ping(self, *_):
|
|
time.sleep(0.5)
|
|
self.com.send_message('OK')
|
|
|
|
def get_soil_temp(self, *_):
|
|
soil_temp = self.probe.get_temp()
|
|
time.sleep(0.5)
|
|
self.com.send_message(str(soil_temp))
|
|
|
|
def get_soil_hmdt(self, *_):
|
|
soil_hmdt = self.probe.get_hmdt()
|
|
time.sleep(0.5)
|
|
self.com.send_message(str(soil_hmdt))
|
|
|
|
def pump_control(self, volume):
|
|
water_qty = int(volume)
|
|
print(f'Running pump for {water_qty} ml.')
|
|
time.sleep(0.5)
|
|
self.com.send_message('OK')
|
|
time.sleep(water_qty)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.root.setLevel(logging.DEBUG)
|
|
slave = Slave()
|
|
slave.wait_for_instructions()
|