You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
183 lines
6.2 KiB
183 lines
6.2 KiB
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Plugin to interface with the shepherd scout pcb modules. Also semi-compatible
|
|
with the older TrapCtrl style boards and Pi HATs based on the SleepyPi2, provided
|
|
they are running the Shepherd Scout firmware.
|
|
|
|
The TDW serial message format is used to pull data from the companion board and
|
|
interact with its RTC and set alarms. This library uses a seperate thread
|
|
to handle the comms with the supervising microcontroller. Interface functions
|
|
add a request to the queue, and some may wait for a state to be updated before
|
|
returning (with a timeout).
|
|
"""
|
|
|
|
import shepherd.config as shconf
|
|
import shepherd.plugin
|
|
from . import tdw
|
|
|
|
import queue
|
|
import threading
|
|
import re
|
|
import serial
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from enum import Enum, auto
|
|
|
|
from collections import namedtuple
|
|
|
|
|
|
class MsgName(Enum):
|
|
BATV = "batv"
|
|
BATI = "bati"
|
|
TIME = "time"
|
|
ALARM = "alarm"
|
|
AUX5V = "aux5v"
|
|
PWM1 = "pwm1"
|
|
PWM2 = "pwm2"
|
|
OUT1 = "out1"
|
|
OUT2 = "out2"
|
|
LOG = "log"
|
|
MEASUREMENT = "meas"
|
|
def __str__(self):
|
|
return str(self.value)
|
|
|
|
logmsgs = ["LE_NONE - Empty log",
|
|
"LE_POWERUP - Scout startup",
|
|
"LE_PI_BOOT_TIMEOUT - Tried to turn on Pi but did not recieve succesful boot signal",
|
|
"LE_PI_ON - Pi has booted",
|
|
"LE_LOW_VOLT_START_SHUTDOWN - Initiated Pi shutdown due to low supply voltage",
|
|
"LE_PI_SIGNAL_START_SHUTDOWN - Pi started to shut itself down",
|
|
"LE_PI_SHUTDOWN_TIMEOUT - Pi did not signal successful shutdown, so killed power",
|
|
"LE_MAIN5V_DISABLE - Main Pi power turned off",
|
|
"LE_VOLT_GOOD_MAIN5V_ENABLE - Turned Pi power on after voltage raised enough",
|
|
"LE_ALARM_MAIN5V_ENABLE - Turned Pi power on after wakeup alarm was hit"]
|
|
|
|
class ScoutPlugin(shepherd.plugin.Plugin):
|
|
@staticmethod
|
|
def define_config(confdef):
|
|
confdef.add_def('boardver', shconf.StringDef())
|
|
confdef.add_def('serialport', shconf.StringDef())
|
|
|
|
def __init__(self, pluginInterface, config):
|
|
super().__init__(pluginInterface, config)
|
|
self.config = config
|
|
self.interface = pluginInterface
|
|
self.plugins = pluginInterface.other_plugins
|
|
self.hooks = pluginInterface.hooks
|
|
|
|
self.msg_handler = tdw.MessageHandler(config["serialport"], 57600)
|
|
|
|
self.interface.register_function(self.get_batv)
|
|
self.interface.register_function(self.get_bati)
|
|
self.interface.register_function(self.get_time)
|
|
self.interface.register_function(self.set_alarm)
|
|
self.interface.register_function(self.set_aux5v)
|
|
self.interface.register_function(self.set_pwm1)
|
|
self.interface.register_function(self.set_pwm2)
|
|
self.interface.register_function(self.set_out1)
|
|
self.interface.register_function(self.set_out2)
|
|
self.interface.register_function(self.test_logs)
|
|
|
|
self.interface.register_function(self.test)
|
|
|
|
def get_batv(self):
|
|
rqst = self.msg_handler.send_request(MsgName.BATV.value)
|
|
if rqst.wait_for_response():
|
|
return rqst.response.arguments[0]
|
|
return None
|
|
|
|
def get_bati(self):
|
|
rqst = self.msg_handler.send_request(MsgName.BATI)
|
|
if rqst.wait_for_response():
|
|
return rqst.response.arguments[0]
|
|
return None
|
|
|
|
def set_aux5v(self, enabled):
|
|
cmd = self.msg_handler.send_command(MsgName.AUX5V, [str(enabled).lower()])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def set_pwm1(self, enabled, pulse_length):
|
|
cmd = self.msg_handler.send_command(MsgName.PWM1, [str(enabled).lower(), str(pulse_length)])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def set_pwm2(self, enabled, pulse_length):
|
|
cmd = self.msg_handler.send_command(MsgName.PWM2, [str(enabled).lower(), str(pulse_length)])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def set_out1(self, enabled):
|
|
cmd = self.msg_handler.send_command(MsgName.OUT1, [str(enabled).lower()])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def set_out2(self, enabled):
|
|
cmd = self.msg_handler.send_command(MsgName.OUT2, [str(enabled).lower()])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def get_time(self):
|
|
rqst = self.msg_handler.send_request(MsgName.TIME)
|
|
if rqst.wait_for_response():
|
|
return rqst.response.arguments[0]
|
|
return None
|
|
|
|
def set_alarm(self, unix_time):
|
|
cmd = self.msg_handler.send_command(MsgName.ALARM, [unix_time])
|
|
if cmd.wait_for_response():
|
|
return cmd.response.arguments[0]
|
|
return None
|
|
|
|
def test_logs(self):
|
|
rqst = self.msg_handler.send_request(MsgName.LOG)
|
|
if rqst.wait_for_response():
|
|
for logline in reversed(rqst.response.multipart_args):
|
|
logdate = datetime.fromtimestamp(int(logline[0]))
|
|
batv = float(logline[1])/100.0
|
|
logmessage = logmsgs[int(logline[2])]
|
|
print(F"::{logdate:%Y-%m-%d %H:%M:%S}:: {batv:.2f}V :: {logmessage}")
|
|
return None
|
|
|
|
def test(self):
|
|
print("Testing companion board...")
|
|
print(F"Current RTC time is {self.get_time()}")
|
|
print(F"Current BatV is {self.get_batv()}")
|
|
print(F"Current BatI is {self.get_bati()}")
|
|
print("Turning on Out1 for 1 second")
|
|
self.set_out1(True)
|
|
time.sleep(1)
|
|
self.set_out1(False)
|
|
print("Turning on Out2 for 1 second")
|
|
self.set_out2(True)
|
|
time.sleep(1)
|
|
self.set_out2(False)
|
|
|
|
print("Enabling auxilliary 5V")
|
|
self.set_aux5v(True)
|
|
|
|
print("Sweeping PWM1 from 1000us to 2000us")
|
|
self.set_pwm1(True, 1000)
|
|
time.sleep(1)
|
|
self.set_pwm1(True, 2000)
|
|
time.sleep(1)
|
|
self.set_pwm1(False, 1000)
|
|
|
|
print("Sweeping PWM2 from 1000us to 2000us")
|
|
self.set_pwm2(True, 1000)
|
|
time.sleep(1)
|
|
self.set_pwm2(True, 2000)
|
|
time.sleep(1)
|
|
self.set_pwm2(False, 1000)
|
|
self.set_aux5v(False)
|
|
print("Test finished")
|
|
|
|
return None
|