parent
999e991298
commit
b4e46362ab
@ -1,285 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import shepherd.config as shconf
|
||||
import shepherd.plugin
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import re
|
||||
import serial
|
||||
import time
|
||||
|
||||
from enum import Enum, auto
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
class Command(Enum):
|
||||
BATV = "batv"
|
||||
BATI = "bati"
|
||||
TIME = "time"
|
||||
ALARM = "alarm"
|
||||
AUX5V = "aux5v"
|
||||
PWM1 = "pwm1"
|
||||
PWM2 = "pwm2"
|
||||
OUT1 = "out1"
|
||||
OUT2 = "out2"
|
||||
|
||||
|
||||
|
||||
|
||||
class CommandRequest():
|
||||
def __init__(self, command, arglist):
|
||||
self.command = command
|
||||
self.arglist = arglist
|
||||
self.response_args = []
|
||||
# event that is triggered when a matching command response is
|
||||
# recieved and the data parsed into ScoutState. ScoutState should
|
||||
# only be then read via the lock on it.
|
||||
self.responded = threading.Event()
|
||||
|
||||
|
||||
ScoutState = namedtuple(
|
||||
'ScoutState', ['batv',
|
||||
'bati',
|
||||
'out1',
|
||||
'out2',
|
||||
'pwm1',
|
||||
'pwm2', ])
|
||||
|
||||
# Plugin to interface with the shepherd scout pcb modules.
|
||||
|
||||
# A seperate thread is started to handle the comms with the
|
||||
# supervising microcontroller. Interface functions add a request
|
||||
# to the queue, and some may wait for a state to be updated before
|
||||
# returning (with a timeout).
|
||||
|
||||
# Command Requests (things to be added to the queue) should only be generated
|
||||
# internally from this plugin - interface functions are provided to wrap
|
||||
# them and optionally return some changed state
|
||||
# The request object (namedTuple?) contains the request string,
|
||||
|
||||
|
||||
class CommandHandler():
|
||||
def __init__(self, config):
|
||||
self._command_queue = queue.Queue()
|
||||
self.port = serial.Serial()
|
||||
self.current_request = None
|
||||
self.curr_request_sent_time = None
|
||||
self.request_timeout = 2
|
||||
|
||||
self.config = config
|
||||
|
||||
self._rx_string = ""
|
||||
|
||||
self.port.baudrate = 57600
|
||||
self.port.timeout = 0
|
||||
self.port.port = self.config["serialport"]
|
||||
# pull in serial config
|
||||
# start thread
|
||||
self.thread = threading.Thread(target=self._serial_comm_thread, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def request(self, newRequest):
|
||||
self._command_queue.put(newRequest)
|
||||
|
||||
def _send_request(self):
|
||||
argstr = ""
|
||||
if len(self.current_request.arglist) > 0:
|
||||
argstr = ':'+','.join(self.current_request.arglist)
|
||||
send_str = '?'+self.current_request.command.value+argstr+'\n'
|
||||
self.port.write(send_str.encode('utf-8'))
|
||||
self.curr_request_sent_time = time.time()
|
||||
|
||||
def _process_bytes(self, new_bytes):
|
||||
self._rx_string = self._rx_string + new_bytes.decode('utf-8')
|
||||
#print(self._rx_string)
|
||||
if ('!' in self._rx_string) or ('?' in self._rx_string):
|
||||
match = re.search(r"([!\?])(.+?)[\r\n]", self._rx_string)
|
||||
while match is not None:
|
||||
self._parse_command(match[2])
|
||||
# only keep part of string after our match and look for another
|
||||
# command
|
||||
self._rx_string = self._rx_string[match.end():]
|
||||
match = re.search(r"[!\?].+?\n", self._rx_string)
|
||||
else:
|
||||
# No command start characters anywhere in the string, so ditch it
|
||||
self._rx_string = ""
|
||||
|
||||
def _parse_command(self, command_str):
|
||||
command_name, _, command_args = command_str.partition(':')
|
||||
if self.current_request is not None:
|
||||
if command_name == self.current_request.command.value:
|
||||
self.current_request.response_args = command_args.split(',')
|
||||
self.current_request.responded.set()
|
||||
self.current_request = None
|
||||
|
||||
def _handle_serial_port(self):
|
||||
while True:
|
||||
if self.port.in_waiting > 0:
|
||||
self._process_bytes(self.port.read(self.port.in_waiting))
|
||||
elif self.current_request is None:
|
||||
try:
|
||||
self.current_request = self._command_queue.get(
|
||||
block=True, timeout=0.01)
|
||||
self._send_request()
|
||||
except queue.Empty:
|
||||
pass
|
||||
else:
|
||||
if (time.time()-self.curr_request_sent_time) > self.request_timeout:
|
||||
#Timeout the request
|
||||
self.current_request = None
|
||||
else:
|
||||
pass
|
||||
#Ideally have a short spin loop delay here, but could cause serial buffer to back up
|
||||
#pyserial buffer is apparently 1024 or 4096 bytes, and at 57600 baud 10ms delay would only
|
||||
# be 72 bytes.
|
||||
time.sleep(0.01)
|
||||
|
||||
def _serial_comm_thread(self):
|
||||
while True:
|
||||
# Producers-consumer pattern
|
||||
# If there's a CommandRequest to on the queue, pop it and set as current
|
||||
# Send the command, and wait for a matching response
|
||||
# timeout the response wait if necessary
|
||||
# Parse any response and update the state
|
||||
# notify the queue responded event
|
||||
|
||||
# Actual wait is on either the non-empty queue or a serial character to parse
|
||||
# Serial comms is not synchronous, so need to be available to recieve characters
|
||||
# at any point
|
||||
try:
|
||||
self.port.open()
|
||||
self._handle_serial_port()
|
||||
except serial.SerialException:
|
||||
time.sleep(1)
|
||||
|
||||
# If there's bytes, read them and deal with them. If there's
|
||||
# a serialexception, try to reopen the port
|
||||
# If no bytes,
|
||||
|
||||
|
||||
class ScoutPlugin(shepherd.plugin.Plugin):
|
||||
@staticmethod
|
||||
def define_config(confdef):
|
||||
confdef.add_def('boardver', shconf.StringDef())
|
||||
confdef.add_def('serialport', shconf.StringDef())
|
||||
|
||||
def __init__(self, pluginInterface, config):
|
||||
super().__init__(pluginInterface, config)
|
||||
self.config = config
|
||||
self.interface = pluginInterface
|
||||
self.plugins = pluginInterface.other_plugins
|
||||
self.hooks = pluginInterface.hooks
|
||||
|
||||
self.cmd_handler = CommandHandler(config)
|
||||
|
||||
self.interface.register_function(self.get_batv)
|
||||
self.interface.register_function(self.get_bati)
|
||||
self.interface.register_function(self.get_time)
|
||||
self.interface.register_function(self.set_alarm)
|
||||
self.interface.register_function(self.set_aux5v)
|
||||
self.interface.register_function(self.set_pwm1)
|
||||
self.interface.register_function(self.set_pwm2)
|
||||
self.interface.register_function(self.set_out1)
|
||||
self.interface.register_function(self.set_out2)
|
||||
|
||||
self.interface.register_function(self.test)
|
||||
|
||||
def get_batv(self):
|
||||
req = CommandRequest(Command.BATV, [])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def get_bati(self):
|
||||
req = CommandRequest(Command.BATI, [])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_aux5v(self, enabled):
|
||||
req = CommandRequest(Command.AUX5V, [str(enabled).lower()])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_pwm1(self, enabled, pulse_length):
|
||||
req = CommandRequest(Command.PWM1, [str(enabled).lower(), str(pulse_length)])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_pwm2(self, enabled, pulse_length):
|
||||
req = CommandRequest(Command.PWM2, [str(enabled).lower(), str(pulse_length)])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_out1(self, enabled):
|
||||
req = CommandRequest(Command.OUT1, [str(enabled).lower()])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_out2(self, enabled):
|
||||
req = CommandRequest(Command.OUT2, [str(enabled).lower()])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def get_time(self):
|
||||
req = CommandRequest(Command.TIME, [])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def set_alarm(self, unix_time):
|
||||
req = CommandRequest(Command.ALARM, [unix_time])
|
||||
self.cmd_handler.request(req)
|
||||
if req.responded.wait(2):
|
||||
return req.response_args[0]
|
||||
return None
|
||||
|
||||
def test(self):
|
||||
print("Testing companion board...")
|
||||
print(F"Current RTC time is {self.get_time()}")
|
||||
print(F"Current BatV is {self.get_batv()}")
|
||||
print(F"Current BatI is {self.get_bati()}")
|
||||
print("Turning on Out1 for 1 second")
|
||||
self.set_out1(True)
|
||||
time.sleep(1)
|
||||
self.set_out1(False)
|
||||
print("Turning on Out2 for 1 second")
|
||||
self.set_out2(True)
|
||||
time.sleep(1)
|
||||
self.set_out2(False)
|
||||
|
||||
print("Enabling auxilliary 5V")
|
||||
self.set_aux5v(True)
|
||||
|
||||
print("Sweeping PWM1 from 1000us to 2000us")
|
||||
self.set_pwm1(True, 1000)
|
||||
time.sleep(1)
|
||||
self.set_pwm1(True, 2000)
|
||||
time.sleep(1)
|
||||
self.set_pwm1(False, 1000)
|
||||
|
||||
print("Sweeping PWM2 from 1000us to 2000us")
|
||||
self.set_pwm2(True, 1000)
|
||||
time.sleep(1)
|
||||
self.set_pwm2(True, 2000)
|
||||
time.sleep(1)
|
||||
self.set_pwm2(False, 1000)
|
||||
self.set_aux5v(False)
|
||||
print("Test finished")
|
||||
|
||||
return None
|
||||
@ -0,0 +1,157 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Plugin to interface with the shepherd scout pcb modules. Also semi-compatible
|
||||
with the older TrapCtrl style boards and Pi HATs based on the SleepyPi2, provided
|
||||
they are running the Shepherd Scout firmware.
|
||||
|
||||
The TDW serial message format is used to pull data from the companion board and
|
||||
interact with its RTC and set alarms. This library uses a seperate thread
|
||||
to handle the comms with the supervising microcontroller. Interface functions
|
||||
add a request to the queue, and some may wait for a state to be updated before
|
||||
returning (with a timeout).
|
||||
"""
|
||||
|
||||
import shepherd.config as shconf
|
||||
import shepherd.plugin
|
||||
import tdw
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import re
|
||||
import serial
|
||||
import time
|
||||
|
||||
from enum import Enum, auto
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
class MsgName(Enum):
|
||||
BATV = "batv"
|
||||
BATI = "bati"
|
||||
TIME = "time"
|
||||
ALARM = "alarm"
|
||||
AUX5V = "aux5v"
|
||||
PWM1 = "pwm1"
|
||||
PWM2 = "pwm2"
|
||||
OUT1 = "out1"
|
||||
OUT2 = "out2"
|
||||
LOG = "log"
|
||||
MEASUREMENT = "meas"
|
||||
|
||||
class ScoutPlugin(shepherd.plugin.Plugin):
|
||||
@staticmethod
|
||||
def define_config(confdef):
|
||||
confdef.add_def('boardver', shconf.StringDef())
|
||||
confdef.add_def('serialport', shconf.StringDef())
|
||||
|
||||
def __init__(self, pluginInterface, config):
|
||||
super().__init__(pluginInterface, config)
|
||||
self.config = config
|
||||
self.interface = pluginInterface
|
||||
self.plugins = pluginInterface.other_plugins
|
||||
self.hooks = pluginInterface.hooks
|
||||
|
||||
self.msg_handler = tdw.MessageHandler(config.serial_port, 57600)
|
||||
|
||||
self.interface.register_function(self.get_batv)
|
||||
self.interface.register_function(self.get_bati)
|
||||
self.interface.register_function(self.get_time)
|
||||
self.interface.register_function(self.set_alarm)
|
||||
self.interface.register_function(self.set_aux5v)
|
||||
self.interface.register_function(self.set_pwm1)
|
||||
self.interface.register_function(self.set_pwm2)
|
||||
self.interface.register_function(self.set_out1)
|
||||
self.interface.register_function(self.set_out2)
|
||||
|
||||
self.interface.register_function(self.test)
|
||||
|
||||
def get_batv(self):
|
||||
rqst = self.msg_handler.send_request(MsgName.BATV)
|
||||
if rqst.wait_for_response():
|
||||
return rqst.response.arguments[0]
|
||||
return None
|
||||
|
||||
def get_bati(self):
|
||||
rqst = self.msg_handler.send_request(MsgName.BATI)
|
||||
if rqst.wait_for_response():
|
||||
return rqst.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_aux5v(self, enabled):
|
||||
cmd = self.msg_handler.send_command(MsgName.AUX5V, [str(enabled).lower()])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_pwm1(self, enabled, pulse_length):
|
||||
cmd = self.msg_handler.send_command(MsgName.PWM1, [str(enabled).lower(), str(pulse_length)])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_pwm2(self, enabled, pulse_length):
|
||||
cmd = self.msg_handler.send_command(MsgName.PWM2, [str(enabled).lower(), str(pulse_length)])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_out1(self, enabled):
|
||||
cmd = self.msg_handler.send_command(MsgName.OUT1, [str(enabled).lower()])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_out2(self, enabled):
|
||||
cmd = self.msg_handler.send_command(MsgName.OUT2, [str(enabled).lower()])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def get_time(self):
|
||||
rqst = self.msg_handler.send_request(MsgName.TIME)
|
||||
if rqst.wait_for_response():
|
||||
return rqst.response.arguments[0]
|
||||
return None
|
||||
|
||||
def set_alarm(self, unix_time):
|
||||
cmd = self.msg_handler.send_command(MsgName.ALARM, [unix_time])
|
||||
if cmd.wait_for_response():
|
||||
return cmd.response.arguments[0]
|
||||
return None
|
||||
|
||||
def test(self):
|
||||
print("Testing companion board...")
|
||||
print(F"Current RTC time is {self.get_time()}")
|
||||
print(F"Current BatV is {self.get_batv()}")
|
||||
print(F"Current BatI is {self.get_bati()}")
|
||||
print("Turning on Out1 for 1 second")
|
||||
self.set_out1(True)
|
||||
time.sleep(1)
|
||||
self.set_out1(False)
|
||||
print("Turning on Out2 for 1 second")
|
||||
self.set_out2(True)
|
||||
time.sleep(1)
|
||||
self.set_out2(False)
|
||||
|
||||
print("Enabling auxilliary 5V")
|
||||
self.set_aux5v(True)
|
||||
|
||||
print("Sweeping PWM1 from 1000us to 2000us")
|
||||
self.set_pwm1(True, 1000)
|
||||
time.sleep(1)
|
||||
self.set_pwm1(True, 2000)
|
||||
time.sleep(1)
|
||||
self.set_pwm1(False, 1000)
|
||||
|
||||
print("Sweeping PWM2 from 1000us to 2000us")
|
||||
self.set_pwm2(True, 1000)
|
||||
time.sleep(1)
|
||||
self.set_pwm2(True, 2000)
|
||||
time.sleep(1)
|
||||
self.set_pwm2(False, 1000)
|
||||
self.set_aux5v(False)
|
||||
print("Test finished")
|
||||
|
||||
return None
|
||||
@ -0,0 +1,422 @@
|
||||
|
||||
"""
|
||||
Python module implementing the TDW (This Do What #!?) serial message format.
|
||||
|
||||
|
||||
Currently only supports responses to sent messages, rather than generic RX.
|
||||
|
||||
Intended use:
|
||||
|
||||
>>> msg_handler = tdw.MessageHandler("/dev/ttyAMA0", 57600)
|
||||
>>> rqst = msg_handler.send_request("BATV")
|
||||
>>> if rqst.wait_for_response()
|
||||
>>> print(rqst.response)
|
||||
|
||||
"""
|
||||
|
||||
|
||||
import sys
|
||||
import queue
|
||||
import threading
|
||||
import serial
|
||||
import re
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
DEFAULT = object()
|
||||
|
||||
MAX_MSG_LEN = 128
|
||||
MAX_MSG_ARGS = 8
|
||||
|
||||
|
||||
# Return a Message object, allowing the the caller to check a "sent" flag to see if it's gone through
|
||||
# yet. If needs_response was true, enable a "wait_for_response()" function that internally blocks until
|
||||
# response is recieved or times out. Returns either None or a Response object.
|
||||
# Note that unless a class overrides __bool__(), it will always evaluate to True, allowing "if wait_for_response():"
|
||||
# to check if there's a returned message. A ".response" property gets filled after returning too, allowing
|
||||
# use of that rather than the caller having to get a reference.
|
||||
# Once "wait_for_response()"" returns, the message handler removes it's reference to the message and
|
||||
# the response, so the caller can be sure that it won't be changed underneath it.
|
||||
|
||||
|
||||
# Currently this is all designed only for comms initiated by Python, and doesn't handle responding
|
||||
# to communication initiated by the device yet. For that, perhaps supply callback attachments
|
||||
# for specific message names as well as a generic one, but by default queue them up and wait
|
||||
# for a call to a "process_messages()" function or something - to allow callbacks to be handled
|
||||
# in the same main thread. Optionally have a flag when creating the MessageHandler for it to
|
||||
# dispatch callbacks in new threads asynchronously (or have individual flags when attaching callbacks
|
||||
# perhaps?)
|
||||
|
||||
|
||||
class MessageType(Enum):
|
||||
COMMENT = "#"
|
||||
COMMAND = "!"
|
||||
REQUEST = "?"
|
||||
|
||||
|
||||
class Message():
|
||||
"""
|
||||
Representation of a message to be sent from this device.
|
||||
Also used to track the corresponding response from the other device.
|
||||
"""
|
||||
|
||||
def __init__(self, msg_type, msg_name, arguments, multipart_args=None):
|
||||
if not isinstance(msg_type, MessageType):
|
||||
raise TypeError("Argument message_type must be a MessageType")
|
||||
self._msg_type = msg_type
|
||||
self._msg_name = msg_name
|
||||
self._arguments = arguments
|
||||
self._multipart_args = multipart_args
|
||||
|
||||
@property
|
||||
def msg_type(self):
|
||||
return self._msg_type
|
||||
|
||||
@property
|
||||
def msg_name(self):
|
||||
return self._msg_name
|
||||
|
||||
@property
|
||||
def arguments(self):
|
||||
return self._arguments
|
||||
|
||||
@property
|
||||
def multipart_args(self):
|
||||
return self._multipart_args
|
||||
|
||||
|
||||
class TXMessage(Message):
|
||||
"""
|
||||
Locks down and stringifies the arguments and multipart arg list into tuples on creation, avoid
|
||||
them being changed underneath it once the message is added to the send queue.
|
||||
"""
|
||||
|
||||
def __init__(self, msg_type, msg_name, arguments, needs_response, response_timeout, multipart_args=None):
|
||||
|
||||
# Stringify args and put in immutable tuples
|
||||
immutable_args = tuple([str(arg) for arg in arguments])
|
||||
|
||||
immutable_multipart_args = None
|
||||
if multipart_args is not None:
|
||||
immutable_multipart_args = []
|
||||
for arglist in multipart_args:
|
||||
immutable_multipart_args.append(tuple([str(arg) for arg in arglist]))
|
||||
immutable_multipart_args = tuple(immutable_multipart_args)
|
||||
|
||||
super().__init__(msg_type, msg_name, immutable_args, immutable_multipart_args)
|
||||
|
||||
self._needs_response = needs_response
|
||||
self._response_timeout = response_timeout
|
||||
self._response = None
|
||||
|
||||
# event that is triggered when a matching command response is
|
||||
# recieved and the data parsed into ScoutState. ScoutState should
|
||||
# only be then read via the lock on it.
|
||||
self._responded = threading.Event()
|
||||
|
||||
@property
|
||||
def needs_response(self):
|
||||
return self._needs_response
|
||||
|
||||
@property
|
||||
def response(self):
|
||||
return self._response
|
||||
|
||||
def wait_for_response(self):
|
||||
if not self._needs_response:
|
||||
raise Exception(
|
||||
"Can't wait for response on a message that has not set 'needs_response=True'")
|
||||
if self._responded.wait(self._response_timeout):
|
||||
# Serial thread should have populated self._response by now
|
||||
return self.response
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
class RXMessage(Message):
|
||||
def __init__(self, msg_type, msg_name, arguments, is_multipart=False, multipart_count=0):
|
||||
|
||||
super().__init__(msg_type, msg_name, arguments)
|
||||
|
||||
self._multipart_count = multipart_count
|
||||
self._is_multipart = is_multipart
|
||||
if is_multipart:
|
||||
self._multipart_args = []
|
||||
|
||||
@property
|
||||
def multipart_count(self):
|
||||
"""
|
||||
The declared number of message parts.
|
||||
//Not// necessarily the number or parts that were actually received.
|
||||
For multipart messages that don't declare a count, this is None.
|
||||
"""
|
||||
return self._multipart_count
|
||||
|
||||
@property
|
||||
def is_multipart(self):
|
||||
"""
|
||||
Returns true if the message was a multi-part message
|
||||
"""
|
||||
return self._is_multipart
|
||||
|
||||
|
||||
class MessageHandler():
|
||||
def __init__(self, serial_port, baud_rate, response_timeout=0.5, multipart_timeout=0.2, loop_delay=0.01):
|
||||
self._tx_message_queue = queue.Queue()
|
||||
|
||||
self._tx_message = None
|
||||
self._tx_sent_time = None
|
||||
|
||||
self._rx_message = None
|
||||
self._last_rx_time = None
|
||||
|
||||
self._rx_multipart_timeout = multipart_timeout
|
||||
|
||||
# Default, can be overridden by individual messages
|
||||
self.response_timeout = response_timeout
|
||||
|
||||
# delay used in serial processing thread between iterations.
|
||||
# If zero, the handler will just spin constantly asking if more serial bytes are available
|
||||
# (a fcntl.ioctl call). If too large, you lose responsiveness when a new message comes in (when
|
||||
# not waiting for a response the loop delay has an event trigger, so new messages get sent immediately)
|
||||
# Very large values risk filling the serial buffer before data can be processed.
|
||||
|
||||
# pyserial buffer is apparently 1024 or 4096 bytes, and at 57600 baud 10ms delay would only
|
||||
# be 72 bytes.
|
||||
self.loop_delay = loop_delay
|
||||
|
||||
self._rx_string = ""
|
||||
|
||||
self.port = serial.Serial()
|
||||
self.port.baudrate = baud_rate # 57600
|
||||
# Set port to be non-blocking
|
||||
self.port.timeout = 0
|
||||
self.port.port = serial_port
|
||||
|
||||
# self._re_message_frame = re.compile(r"([#!?])(.+?)[\r\n]")
|
||||
self._re_msg_start = re.compile(r"[#!?]")
|
||||
self._frame_end_chars = "\r\n"
|
||||
self._re_msg_bounds = re.compile(r"[#!?\r\n]")
|
||||
|
||||
# start thread
|
||||
self.thread = threading.Thread(target=self._serial_comm_thread, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def send_message(self, message):
|
||||
"""
|
||||
Add a message to the queue to be sent
|
||||
"""
|
||||
if not isinstance(message, TXMessage):
|
||||
raise TypeError("'message' argument must be of type TXMessage")
|
||||
self._tx_message_queue.put(message)
|
||||
|
||||
def send_comment(self, message_name, arguments=[], needs_response=False, response_timeout=DEFAULT):
|
||||
if response_timeout is DEFAULT:
|
||||
response_timeout = self.response_timeout
|
||||
msg = TXMessage(MessageType.COMMENT, message_name,
|
||||
arguments, needs_response, response_timeout)
|
||||
self.send_message(msg)
|
||||
return msg
|
||||
|
||||
def send_command(self, message_name, arguments=[], needs_response=True, response_timeout=DEFAULT):
|
||||
if response_timeout is DEFAULT:
|
||||
response_timeout = self.response_timeout
|
||||
msg = TXMessage(MessageType.COMMAND, message_name,
|
||||
arguments, needs_response, response_timeout)
|
||||
self.send_message(msg)
|
||||
return msg
|
||||
|
||||
def send_request(self, message_name, arguments=[], needs_response=True, response_timeout=DEFAULT):
|
||||
if response_timeout is DEFAULT:
|
||||
response_timeout = self.response_timeout
|
||||
msg = TXMessage(MessageType.REQUEST, message_name,
|
||||
arguments, needs_response, response_timeout)
|
||||
self.send_message(msg)
|
||||
return msg
|
||||
|
||||
def _send_message(self):
|
||||
"""
|
||||
Actually send a message pulled from the queue.
|
||||
Only called from the serial_comm thread
|
||||
"""
|
||||
|
||||
argstr = ""
|
||||
if len(self._tx_message.arguments) > 0:
|
||||
argstr = ':'+','.join(self._tx_message.arguments)
|
||||
|
||||
send_str = F"{self._tx_message.msg_type.value}{self._tx_message.msg_name}{argstr}\n"
|
||||
self.port.write(send_str.encode('utf-8'))
|
||||
self._tx_sent_time = time.time()
|
||||
|
||||
# Only keep the current message around if we need to track a response
|
||||
if not self._tx_message.needs_response:
|
||||
self._tx_message = None
|
||||
|
||||
def _find_msg_frame(self):
|
||||
"""
|
||||
Finds next frame in the _rx_string, trimming any excess.
|
||||
Returns true if it found something and needs to be called again,
|
||||
intended to be called in a loop.
|
||||
|
||||
"""
|
||||
match = self._re_msg_start.search(self._rx_string)
|
||||
if match is None:
|
||||
# No command start characters anywhere in the string, so ditch it
|
||||
self._rx_string = ""
|
||||
return False
|
||||
|
||||
# trim anything before start
|
||||
if match.start() > 0:
|
||||
self._rx_string = self._rx_string[match.start():]
|
||||
|
||||
# Search for next start or end character
|
||||
match = self._re_msg_bounds.search(self._rx_string, 1)
|
||||
if match is None:
|
||||
# No end of frame found
|
||||
if len(self._rx_string) > MAX_MSG_LEN:
|
||||
# We have a message start, but too many characters after it without an end
|
||||
# of frame, so ditch it.
|
||||
self._rx_string = ""
|
||||
return False
|
||||
|
||||
if match[0] in self._frame_end_chars:
|
||||
# Found our frame end
|
||||
self._parse_message_text(self._rx_string[:match.start()])
|
||||
# Trim out the message we just found and start again
|
||||
self._rx_string = self._rx_string[match.end():]
|
||||
return True
|
||||
else:
|
||||
# Found another start character, trim and start again
|
||||
self._rx_string = self._rx_string[match.start():]
|
||||
return True
|
||||
|
||||
def _parse_message_text(self, msg_str):
|
||||
"""
|
||||
Parse a recieved message string.
|
||||
'msg_str' contains the start character and everything after that,
|
||||
not including the terminating linefeed.
|
||||
Only called from the serial_comm thread
|
||||
"""
|
||||
|
||||
self._last_rx_time = time.time()
|
||||
|
||||
msg_name, _, msg_args_text = msg_str.strip(' \t').partition(':')
|
||||
|
||||
# Convert the character to our enum for later comparison
|
||||
msg_type = MessageType(msg_name[0])
|
||||
|
||||
msg_name = msg_name[1:]
|
||||
msg_args = msg_args_text.split(',')
|
||||
|
||||
# Trim whitespace around name and args
|
||||
msg_name = msg_name.strip(' \t')
|
||||
msg_args = [arg.strip(' \t') for arg in msg_args]
|
||||
|
||||
multipart_count = None
|
||||
multipart_start = False
|
||||
multipart_end = False
|
||||
if msg_args[-1][0] == '<':
|
||||
multipart_start = True
|
||||
# Message is indicating a multipart start
|
||||
count_txt = msg_args[-1][1:].strip(" \t")
|
||||
try:
|
||||
multipart_count = int(count_txt)
|
||||
except ValueError:
|
||||
# If no count supplied or can't understand it, treat it as open
|
||||
multipart_count = None
|
||||
|
||||
# Remove the multipart start indicator from the args
|
||||
msg_args = msg_args[:-1]
|
||||
elif msg_args[-1][0] == '>':
|
||||
multipart_end = True
|
||||
# Remove the multipart end indicator from the args
|
||||
msg_args = msg_args[:-1]
|
||||
|
||||
if self._rx_message is not None:
|
||||
# We have a multipart message in progress
|
||||
if (self._rx_message.msg_name == msg_name) and (self._rx_message.msg_type == msg_type) and (not multipart_start):
|
||||
# Only skip adding args to list if it's a blank mutipart end message
|
||||
if not (multipart_end and (len(msg_args) == 0)):
|
||||
self._rx_message._multipart_args.append(msg_args)
|
||||
|
||||
if multipart_end or (len(self._rx_message.multipart_args) >= self._rx_message.multipart_count):
|
||||
self._process_rx_message()
|
||||
# We're done here
|
||||
return
|
||||
else:
|
||||
# We've got a new message interrupting the in-progress one
|
||||
# Close off existing rx message and continue
|
||||
self._process_rx_message()
|
||||
|
||||
self._rx_message = RXMessage(msg_type, msg_name, msg_args,
|
||||
is_multipart=multipart_start, multipart_count=multipart_count)
|
||||
if not self._rx_message.is_multipart:
|
||||
self._process_rx_message()
|
||||
|
||||
def _process_rx_message(self):
|
||||
"""
|
||||
Process a received and parsed message in self._rx_message,
|
||||
and dispatches any necessary actions.
|
||||
Clears self._rx_message after it's done.
|
||||
"""
|
||||
|
||||
if (self._tx_message is not None) and (self._tx_message.msg_name == self._rx_message.msg_name) and (self._rx_message.msg_type == MessageType.COMMENT):
|
||||
# _tx_message only hangs around if it's waiting for response.
|
||||
# can't just assume next received message is a response, as the other
|
||||
# device might be sending something else in the meantime, so check for name.
|
||||
# Responses are always comments.
|
||||
self._tx_message._response = self._rx_message
|
||||
self._tx_message._responded.set()
|
||||
self._tx_message = None
|
||||
else:
|
||||
pass
|
||||
# TODO - Handle other type of received message and dispatch actions here - callbacks?
|
||||
self._rx_message = None
|
||||
|
||||
def _handle_serial_port(self):
|
||||
# If there's bytes, read them and deal with them. The underlying port read locks
|
||||
# the GIL, so use non-blocking mode.
|
||||
if self.port.in_waiting > 0:
|
||||
new_bytes = self.port.read(self.port.in_waiting)
|
||||
self._rx_string = self._rx_string + new_bytes.decode('utf-8')
|
||||
# Find and process message frames
|
||||
while self._find_msg_frame():
|
||||
pass
|
||||
# Loop back to check for more RX characters (the priority)
|
||||
return
|
||||
|
||||
if self._rx_message is not None:
|
||||
if (time.time()-self._last_rx_time) >= self._rx_multipart_timeout:
|
||||
# Been too long waiting for a multipart message part, timeout and process
|
||||
# so the main program can use the message
|
||||
self._process_rx_message()
|
||||
|
||||
if self._tx_message is not None:
|
||||
# Wait for current tx_message to time out
|
||||
if (time.time()-self._tx_sent_time) >= self._tx_message._response_timeout:
|
||||
# Timeout the request
|
||||
self._tx_message = None
|
||||
else:
|
||||
time.sleep(self.loop_delay)
|
||||
else:
|
||||
# Try and get a new message to send
|
||||
try:
|
||||
self._tx_message = self._tx_message_queue.get(
|
||||
block=True, timeout=self.loop_delay)
|
||||
# Only gets here if current_tx_message is actually set
|
||||
self._send_message()
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def _serial_comm_thread(self):
|
||||
while True:
|
||||
# Actual wait is on either the non-empty queue or a serial character to parse
|
||||
# Serial comms is not synchronous, so need to be available to recieve characters
|
||||
# at any point
|
||||
try:
|
||||
self.port.open()
|
||||
while True:
|
||||
self._handle_serial_port()
|
||||
except serial.SerialException:
|
||||
time.sleep(1)
|
||||
# If there's a serialexception, try to reopen the port
|
||||
Loading…
Reference in new issue