#!/usr/bin/env python3 """ Plugin to interface with the shepherd scout pcb modules. Also semi-compatible with the older TrapCtrl style boards and Pi HATs based on the SleepyPi2, provided they are running the Shepherd Scout firmware. The TDW serial message format is used to pull data from the companion board and interact with its RTC and set alarms. This library uses a seperate thread to handle the comms with the supervising microcontroller. Interface functions add a request to the queue, and some may wait for a state to be updated before returning (with a timeout). """ import shepherd.config as shconf import shepherd.plugin from . import tdw import queue import threading import re import serial import time from datetime import datetime from enum import Enum, auto from collections import namedtuple class MsgName(Enum): BATV = "batv" BATI = "bati" TIME = "time" ALARM = "alarm" AUX5V = "aux5v" PWM1 = "pwm1" PWM2 = "pwm2" OUT1 = "out1" OUT2 = "out2" VERSION = "version" LOG = "log" MEASUREMENT = "meas" def __str__(self): return str(self.value) logmsgs = ["LE_NONE - Empty log", "LE_POWERUP - Scout startup", "LE_PI_BOOT_TIMEOUT - Tried to turn on Pi but did not recieve succesful boot signal", "LE_PI_ON - Pi has booted", "LE_LOW_VOLT_START_SHUTDOWN - Initiated Pi shutdown due to low supply voltage", "LE_PI_SIGNAL_START_SHUTDOWN - Pi started to shut itself down", "LE_PI_SHUTDOWN_TIMEOUT - Pi did not signal successful shutdown, so killed power", "LE_MAIN5V_DISABLE - Main Pi power turned off", "LE_VOLT_GOOD_MAIN5V_ENABLE - Turned Pi power on after voltage raised enough", "LE_ALARM_MAIN5V_ENABLE - Turned Pi power on after wakeup alarm was hit"] class ScoutPlugin(shepherd.plugin.Plugin): @staticmethod def define_config(confdef): confdef.add_def('boardver', shconf.StringDef()) confdef.add_def('serialport', shconf.StringDef()) def __init__(self, pluginInterface, config): super().__init__(pluginInterface, config) self.config = config self.interface = pluginInterface self.plugins = pluginInterface.other_plugins self.hooks = pluginInterface.hooks self.msg_handler = tdw.MessageHandler(config["serialport"], 57600) self.interface.register_function(self.get_batv) self.interface.register_function(self.get_bati) self.interface.register_function(self.get_time) self.interface.register_function(self.set_alarm) self.interface.register_function(self.set_aux5v) self.interface.register_function(self.set_pwm1) self.interface.register_function(self.set_pwm2) self.interface.register_function(self.set_out1) self.interface.register_function(self.set_out2) self.interface.register_function(self.test_logs) self.interface.register_function(self.get_logs) self.interface.register_function(self.get_measurements) self.interface.register_function(self.test) def get_version(self): rqst = self.msg_handler.send_request(MsgName.VERSION.value) if rqst.wait_for_response(): return rqst.response.arguments[0:2] return None def get_batv(self): rqst = self.msg_handler.send_request(MsgName.BATV.value) if rqst.wait_for_response(): return rqst.response.arguments[0] return None def get_bati(self): rqst = self.msg_handler.send_request(MsgName.BATI) if rqst.wait_for_response(): return rqst.response.arguments[0] return None def set_aux5v(self, enabled): cmd = self.msg_handler.send_command(MsgName.AUX5V, [str(enabled).lower()]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def set_pwm1(self, enabled, pulse_length): cmd = self.msg_handler.send_command( MsgName.PWM1, [str(enabled).lower(), str(pulse_length)]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def set_pwm2(self, enabled, pulse_length): cmd = self.msg_handler.send_command( MsgName.PWM2, [str(enabled).lower(), str(pulse_length)]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def set_out1(self, enabled): cmd = self.msg_handler.send_command(MsgName.OUT1, [str(enabled).lower()]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def set_out2(self, enabled): cmd = self.msg_handler.send_command(MsgName.OUT2, [str(enabled).lower()]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def get_time(self): rqst = self.msg_handler.send_request(MsgName.TIME) if rqst.wait_for_response(): return rqst.response.arguments[0] return None def set_alarm(self, unix_time): cmd = self.msg_handler.send_command(MsgName.ALARM, [unix_time]) if cmd.wait_for_response(): return cmd.response.arguments[0] return None def get_logs(self): rqst = self.msg_handler.send_request(MsgName.LOG) if rqst.wait_for_response(): return rqst.response.multipart_args return None def get_measurements(self): rqst = self.msg_handler.send_request(MsgName.MEASUREMENT) if rqst.wait_for_response(): return rqst.response.multipart_args return None def test_logs(self): rqst = self.msg_handler.send_request(MsgName.LOG) if rqst.wait_for_response(): for logline in reversed(rqst.response.multipart_args): logdate = datetime.fromtimestamp(int(logline[0])) batv = float(logline[1])/100.0 logmessage = logmsgs[int(logline[2])] print(F"::{logdate:%Y-%m-%d %H:%M:%S}:: {batv:.2f}V :: {logmessage}") return None def test(self): print("Testing companion board...") print(F"Current RTC time is {self.get_time()}") print(F"Current BatV is {self.get_batv()}") print(F"Current BatI is {self.get_bati()}") print("Turning on Out1 for 1 second") self.set_out1(True) time.sleep(1) self.set_out1(False) print("Turning on Out2 for 1 second") self.set_out2(True) time.sleep(1) self.set_out2(False) print("Enabling auxilliary 5V") self.set_aux5v(True) print("Sweeping PWM1 from 1000us to 2000us") self.set_pwm1(True, 1000) time.sleep(1) self.set_pwm1(True, 2000) time.sleep(1) self.set_pwm1(False, 1000) print("Sweeping PWM2 from 1000us to 2000us") self.set_pwm2(True, 1000) time.sleep(1) self.set_pwm2(True, 2000) time.sleep(1) self.set_pwm2(False, 1000) self.set_aux5v(False) print("Test finished") return None