You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/plugins/scout/scout.py

160 lines
5.0 KiB

#!/usr/bin/env python3
"""
Plugin to interface with the shepherd scout pcb modules. Also semi-compatible
with the older TrapCtrl style boards and Pi HATs based on the SleepyPi2, provided
they are running the Shepherd Scout firmware.
The TDW serial message format is used to pull data from the companion board and
interact with its RTC and set alarms. This library uses a seperate thread
to handle the comms with the supervising microcontroller. Interface functions
add a request to the queue, and some may wait for a state to be updated before
returning (with a timeout).
"""
import shepherd.config as shconf
import shepherd.plugin
import tdw
import queue
import threading
import re
import serial
import time
from enum import Enum, auto
from collections import namedtuple
class MsgName(Enum):
BATV = "batv"
BATI = "bati"
TIME = "time"
ALARM = "alarm"
AUX5V = "aux5v"
PWM1 = "pwm1"
PWM2 = "pwm2"
OUT1 = "out1"
OUT2 = "out2"
LOG = "log"
MEASUREMENT = "meas"
def __str__(self):
return str(self.value)
class ScoutPlugin(shepherd.plugin.Plugin):
@staticmethod
def define_config(confdef):
confdef.add_def('boardver', shconf.StringDef())
confdef.add_def('serialport', shconf.StringDef())
def __init__(self, pluginInterface, config):
super().__init__(pluginInterface, config)
self.config = config
self.interface = pluginInterface
self.plugins = pluginInterface.other_plugins
self.hooks = pluginInterface.hooks
self.msg_handler = tdw.MessageHandler(config.serial_port, 57600)
self.interface.register_function(self.get_batv)
self.interface.register_function(self.get_bati)
self.interface.register_function(self.get_time)
self.interface.register_function(self.set_alarm)
self.interface.register_function(self.set_aux5v)
self.interface.register_function(self.set_pwm1)
self.interface.register_function(self.set_pwm2)
self.interface.register_function(self.set_out1)
self.interface.register_function(self.set_out2)
self.interface.register_function(self.test)
def get_batv(self):
rqst = self.msg_handler.send_request(MsgName.BATV)
if rqst.wait_for_response():
return rqst.response.arguments[0]
return None
def get_bati(self):
rqst = self.msg_handler.send_request(MsgName.BATI)
if rqst.wait_for_response():
return rqst.response.arguments[0]
return None
def set_aux5v(self, enabled):
cmd = self.msg_handler.send_command(MsgName.AUX5V, [str(enabled).lower()])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def set_pwm1(self, enabled, pulse_length):
cmd = self.msg_handler.send_command(MsgName.PWM1, [str(enabled).lower(), str(pulse_length)])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def set_pwm2(self, enabled, pulse_length):
cmd = self.msg_handler.send_command(MsgName.PWM2, [str(enabled).lower(), str(pulse_length)])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def set_out1(self, enabled):
cmd = self.msg_handler.send_command(MsgName.OUT1, [str(enabled).lower()])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def set_out2(self, enabled):
cmd = self.msg_handler.send_command(MsgName.OUT2, [str(enabled).lower()])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def get_time(self):
rqst = self.msg_handler.send_request(MsgName.TIME)
if rqst.wait_for_response():
return rqst.response.arguments[0]
return None
def set_alarm(self, unix_time):
cmd = self.msg_handler.send_command(MsgName.ALARM, [unix_time])
if cmd.wait_for_response():
return cmd.response.arguments[0]
return None
def test(self):
print("Testing companion board...")
print(F"Current RTC time is {self.get_time()}")
print(F"Current BatV is {self.get_batv()}")
print(F"Current BatI is {self.get_bati()}")
print("Turning on Out1 for 1 second")
self.set_out1(True)
time.sleep(1)
self.set_out1(False)
print("Turning on Out2 for 1 second")
self.set_out2(True)
time.sleep(1)
self.set_out2(False)
print("Enabling auxilliary 5V")
self.set_aux5v(True)
print("Sweeping PWM1 from 1000us to 2000us")
self.set_pwm1(True, 1000)
time.sleep(1)
self.set_pwm1(True, 2000)
time.sleep(1)
self.set_pwm1(False, 1000)
print("Sweeping PWM2 from 1000us to 2000us")
self.set_pwm2(True, 1000)
time.sleep(1)
self.set_pwm2(True, 2000)
time.sleep(1)
self.set_pwm2(False, 1000)
self.set_aux5v(False)
print("Test finished")
return None