You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							429 lines
						
					
					
						
							16 KiB
						
					
					
				
			
		
		
	
	
							429 lines
						
					
					
						
							16 KiB
						
					
					
				| 
 | |
| """
 | |
| Python module implementing the TDW (This Do What #!?) serial message format.
 | |
| 
 | |
| 
 | |
| Currently only supports responses to sent messages, rather than generic RX.
 | |
| 
 | |
| Intended use:
 | |
| 
 | |
| >>> msg_handler = tdw.MessageHandler("/dev/ttyAMA0", 57600)
 | |
| >>> rqst = msg_handler.send_request("BATV")
 | |
| >>> if rqst.wait_for_response()
 | |
| >>>     print(rqst.response)
 | |
| 
 | |
| """
 | |
| 
 | |
| 
 | |
| import sys
 | |
| import queue
 | |
| import threading
 | |
| import serial
 | |
| import re
 | |
| import time
 | |
| from enum import Enum
 | |
| 
 | |
| DEFAULT = object()
 | |
| 
 | |
| MAX_MSG_LEN = 128
 | |
| MAX_MSG_ARGS = 8
 | |
| 
 | |
| 
 | |
| # Return a Message object, allowing the the caller to check a "sent" flag to see if it's gone through
 | |
| # yet. If needs_response was true, enable a "wait_for_response()" function that internally blocks until
 | |
| # response is recieved or times out. Returns either None or a Response object.
 | |
| # Note that unless a class overrides __bool__(), it will always evaluate to True, allowing "if wait_for_response():"
 | |
| # to check if there's a returned message. A ".response" property gets filled after returning too, allowing
 | |
| # use of that rather than the caller having to get a reference.
 | |
| # Once "wait_for_response()"" returns, the message handler removes it's reference to the message and
 | |
| # the response, so the caller can be sure that it won't be changed underneath it.
 | |
| 
 | |
| 
 | |
| # Currently this is all designed only for comms initiated by Python, and doesn't handle responding
 | |
| # to communication initiated by the device yet. For that, perhaps supply callback attachments
 | |
| # for specific message names as well as a generic one, but by default queue them up and wait
 | |
| # for a call to a "process_messages()" function or something - to allow callbacks to be handled
 | |
| # in the same main thread. Optionally have a flag when creating the MessageHandler for it to
 | |
| # dispatch callbacks in new threads asynchronously (or have individual flags when attaching callbacks
 | |
| # perhaps?)
 | |
| 
 | |
| 
 | |
| class MessageType(Enum):
 | |
|     COMMENT = "#"
 | |
|     COMMAND = "!"
 | |
|     REQUEST = "?"
 | |
|     def __str__(self):
 | |
|         return str(self.value)
 | |
| 
 | |
| 
 | |
| class Message():
 | |
|     """
 | |
|     Representation of a message to be sent from this device.
 | |
|     Also used to track the corresponding response from the other device.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, msg_type, msg_name, arguments, multipart_args=None):
 | |
|         if not isinstance(msg_type, MessageType):
 | |
|             raise TypeError("Argument message_type must be a MessageType")
 | |
|         self._msg_type = msg_type
 | |
|         self._msg_name = str(msg_name)
 | |
|         self._arguments = arguments
 | |
|         self._multipart_args = multipart_args
 | |
| 
 | |
|     @property
 | |
|     def msg_type(self):
 | |
|         return self._msg_type
 | |
| 
 | |
|     @property
 | |
|     def msg_name(self):
 | |
|         return self._msg_name
 | |
| 
 | |
|     @property
 | |
|     def arguments(self):
 | |
|         return self._arguments
 | |
| 
 | |
|     @property
 | |
|     def multipart_args(self):
 | |
|         return self._multipart_args
 | |
| 
 | |
| 
 | |
| class TXMessage(Message):
 | |
|     """
 | |
|     Locks down and stringifies the arguments and multipart arg list into tuples on creation, avoid
 | |
|     them being changed underneath it once the message is added to the send queue.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, msg_type, msg_name, arguments, needs_response, response_timeout, multipart_args=None):
 | |
| 
 | |
|         # Stringify args and put in immutable tuples
 | |
|         immutable_args = tuple([str(arg) for arg in arguments])
 | |
| 
 | |
|         immutable_multipart_args = None
 | |
|         if multipart_args is not None:
 | |
|             immutable_multipart_args = []
 | |
|             for arglist in multipart_args:
 | |
|                 immutable_multipart_args.append(tuple([str(arg) for arg in arglist]))
 | |
|             immutable_multipart_args = tuple(immutable_multipart_args)
 | |
| 
 | |
|         super().__init__(msg_type, msg_name, immutable_args, immutable_multipart_args)
 | |
| 
 | |
|         self._needs_response = needs_response
 | |
|         self._response_timeout = response_timeout
 | |
|         self._response = None
 | |
| 
 | |
|         # event that is triggered when a matching command response is
 | |
|         # recieved and the data parsed into ScoutState. ScoutState should
 | |
|         # only be then read via the lock on it.
 | |
|         self._responded = threading.Event()
 | |
| 
 | |
|     @property
 | |
|     def needs_response(self):
 | |
|         return self._needs_response
 | |
| 
 | |
|     @property
 | |
|     def response(self):
 | |
|         return self._response
 | |
| 
 | |
|     def wait_for_response(self):
 | |
|         if not self._needs_response:
 | |
|             raise Exception(
 | |
|                 "Can't wait for response on a message that has not set 'needs_response=True'")
 | |
|         if self._responded.wait(self._response_timeout):
 | |
|             # Serial thread should have populated self._response by now
 | |
|             return self.response
 | |
|         else:
 | |
|             return False
 | |
| 
 | |
| 
 | |
| class RXMessage(Message):
 | |
|     def __init__(self, msg_type, msg_name, arguments, is_multipart=False, multipart_count=0):
 | |
| 
 | |
|         super().__init__(msg_type, msg_name, arguments)
 | |
| 
 | |
|         self._multipart_count = multipart_count
 | |
|         self._is_multipart = is_multipart
 | |
|         if is_multipart:
 | |
|             self._multipart_args = []
 | |
| 
 | |
|     @property
 | |
|     def multipart_count(self):
 | |
|         """
 | |
|         The declared number of message parts.
 | |
|         //Not// necessarily the number or parts that were actually received.
 | |
|         For multipart messages that don't declare a count, this is None.
 | |
|         """
 | |
|         return self._multipart_count
 | |
| 
 | |
|     @property
 | |
|     def is_multipart(self):
 | |
|         """
 | |
|         Returns true if the message was a multi-part message
 | |
|         """
 | |
|         return self._is_multipart
 | |
| 
 | |
| 
 | |
| class MessageHandler():
 | |
|     def __init__(self, serial_port, baud_rate, response_timeout=0.5, multipart_timeout=0.2, loop_delay=0.01):
 | |
|         self._tx_message_queue = queue.Queue()
 | |
| 
 | |
|         self._tx_message = None
 | |
|         self._tx_sent_time = None
 | |
| 
 | |
|         self._rx_message = None
 | |
|         self._last_rx_time = None
 | |
| 
 | |
|         self._rx_multipart_timeout = multipart_timeout
 | |
| 
 | |
|         # Default, can be overridden by individual messages
 | |
|         self.response_timeout = response_timeout
 | |
| 
 | |
|         # delay used in serial processing thread between iterations.
 | |
|         # If zero, the handler will just spin constantly asking if more serial bytes are available
 | |
|         # (a fcntl.ioctl call). If too large, you lose responsiveness when a new message comes in (when
 | |
|         # not waiting for a response the loop delay has an event trigger, so new messages get sent immediately)
 | |
|         # Very large values risk filling the serial buffer before data can be processed.
 | |
| 
 | |
|         # pyserial buffer is apparently 1024 or 4096 bytes, and at 57600 baud 10ms delay would only
 | |
|         # be 72 bytes.
 | |
|         self.loop_delay = loop_delay
 | |
| 
 | |
|         self._rx_string = ""
 | |
| 
 | |
|         self.port = serial.Serial()
 | |
|         self.port.baudrate = baud_rate  # 57600
 | |
|         # Set port to be non-blocking
 | |
|         self.port.timeout = 0
 | |
|         self.port.port = serial_port
 | |
| 
 | |
|         # self._re_message_frame = re.compile(r"([#!?])(.+?)[\r\n]")
 | |
|         self._re_msg_start = re.compile(r"[#!?]")
 | |
|         self._frame_end_chars = "\r\n"
 | |
|         self._re_msg_bounds = re.compile(r"[#!?\r\n]")
 | |
| 
 | |
|         # start thread
 | |
|         self.thread = threading.Thread(target=self._serial_comm_thread, daemon=True)
 | |
|         self.thread.start()
 | |
| 
 | |
|     def send_message(self, message):
 | |
|         """
 | |
|         Add a message to the queue to be sent
 | |
|         """
 | |
|         if not isinstance(message, TXMessage):
 | |
|             raise TypeError("'message' argument must be of type TXMessage")
 | |
|         self._tx_message_queue.put(message)
 | |
| 
 | |
|     def send_comment(self, message_name, arguments=[], needs_response=False, response_timeout=DEFAULT):
 | |
|         if response_timeout is DEFAULT:
 | |
|             response_timeout = self.response_timeout
 | |
|         msg = TXMessage(MessageType.COMMENT, message_name,
 | |
|                         arguments, needs_response, response_timeout)
 | |
|         self.send_message(msg)
 | |
|         return msg
 | |
| 
 | |
|     def send_command(self, message_name, arguments=[], needs_response=True, response_timeout=DEFAULT):
 | |
|         if response_timeout is DEFAULT:
 | |
|             response_timeout = self.response_timeout
 | |
|         msg = TXMessage(MessageType.COMMAND, message_name,
 | |
|                         arguments, needs_response, response_timeout)
 | |
|         self.send_message(msg)
 | |
|         return msg
 | |
| 
 | |
|     def send_request(self, message_name, arguments=[], needs_response=True, response_timeout=DEFAULT):
 | |
|         if response_timeout is DEFAULT:
 | |
|             response_timeout = self.response_timeout
 | |
|         msg = TXMessage(MessageType.REQUEST, message_name,
 | |
|                         arguments, needs_response, response_timeout)
 | |
|         self.send_message(msg)
 | |
|         return msg
 | |
| 
 | |
|     def _send_message(self):
 | |
|         """
 | |
|         Actually send a message pulled from the queue.
 | |
|         Only called from the serial_comm thread
 | |
|         """
 | |
|         
 | |
| 
 | |
|         argstr = ""
 | |
|         if len(self._tx_message.arguments) > 0:
 | |
|             argstr = ':'+','.join(self._tx_message.arguments)
 | |
| 
 | |
|         send_str = F"{self._tx_message.msg_type}{self._tx_message.msg_name}{argstr}\n"
 | |
|         self.port.write(send_str.encode('utf-8'))
 | |
|         self._tx_sent_time = time.time()
 | |
|         #print(F"TX: {send_str}")
 | |
|         # Only keep the current message around if we need to track a response
 | |
|         if not self._tx_message.needs_response:
 | |
|             self._tx_message = None
 | |
| 
 | |
|     def _find_msg_frame(self):
 | |
|         """
 | |
|         Finds next frame in the _rx_string, trimming any excess.
 | |
|         Returns true if it found something and needs to be called again,
 | |
|         intended to be called in a loop.
 | |
| 
 | |
|         """
 | |
|         match = self._re_msg_start.search(self._rx_string)
 | |
|         if match is None:
 | |
|             # No command start characters anywhere in the string, so ditch it
 | |
|             self._rx_string = ""
 | |
|             return False
 | |
| 
 | |
|         # trim anything before start
 | |
|         if match.start() > 0:
 | |
|             self._rx_string = self._rx_string[match.start():]
 | |
| 
 | |
|         # Search for next start or end character
 | |
|         match = self._re_msg_bounds.search(self._rx_string, 1)
 | |
|         if match is None:
 | |
|             # No end of frame found
 | |
|             if len(self._rx_string) > MAX_MSG_LEN:
 | |
|                 # We have a message start, but too many characters after it without an end
 | |
|                 # of frame, so ditch it.
 | |
|                 self._rx_string = ""
 | |
|             return False
 | |
| 
 | |
|         if match[0] in self._frame_end_chars:
 | |
|             # Found our frame end
 | |
|             self._parse_message_text(self._rx_string[:match.start()])
 | |
|             # Trim out the message we just found and start again
 | |
|             self._rx_string = self._rx_string[match.end():]
 | |
|             return True
 | |
|         else:
 | |
|             # Found another start character, trim and start again
 | |
|             self._rx_string = self._rx_string[match.start():]
 | |
|             return True
 | |
| 
 | |
|     def _parse_message_text(self, msg_str):
 | |
|         """
 | |
|         Parse a recieved message string.
 | |
|         'msg_str' contains the start character and everything after that, 
 | |
|         not including the terminating linefeed.
 | |
|         Only called from the serial_comm thread
 | |
|         """
 | |
| 
 | |
|         #print(F"RX: {msg_str}")
 | |
|         self._last_rx_time = time.time()
 | |
| 
 | |
|         msg_name, _, msg_args_text = msg_str.strip(' \t').partition(':')
 | |
| 
 | |
|         # Convert the character to our enum for later comparison
 | |
|         msg_type = MessageType(msg_name[0])
 | |
| 
 | |
|         msg_name = msg_name[1:]
 | |
|         msg_args = msg_args_text.split(',')
 | |
| 
 | |
|         # Trim whitespace around name and args
 | |
|         msg_name = msg_name.strip(' \t')
 | |
|         msg_args = [arg.strip(' \t') for arg in msg_args]
 | |
| 
 | |
|         multipart_count = None
 | |
|         multipart_start = False
 | |
|         multipart_end = False
 | |
|         if msg_args[-1][0] == '<':
 | |
|             multipart_start = True
 | |
|             # Message is indicating a multipart start
 | |
|             count_txt = msg_args[-1][1:].strip(" \t")
 | |
|             try:
 | |
|                 multipart_count = int(count_txt)
 | |
|             except ValueError:
 | |
|                 # If no count supplied or can't understand it, treat it as open
 | |
|                 multipart_count = None
 | |
| 
 | |
|             # Remove the multipart start indicator from the args
 | |
|             msg_args = msg_args[:-1]
 | |
|         elif msg_args[-1][0] == '>':
 | |
|             multipart_end = True
 | |
|             # Remove the multipart end indicator from the args
 | |
|             msg_args = msg_args[:-1]
 | |
| 
 | |
|         if self._rx_message is not None:
 | |
|             # We have a multipart message in progress
 | |
|             if (self._rx_message.msg_name == msg_name) and (self._rx_message.msg_type == msg_type) and (not multipart_start):
 | |
|                 # Only skip adding args to list if it's a blank mutipart end message
 | |
|                 if not (multipart_end and (len(msg_args) == 0)):
 | |
|                     self._rx_message._multipart_args.append(msg_args)
 | |
| 
 | |
|                 if multipart_end or (len(self._rx_message.multipart_args) >= self._rx_message.multipart_count):
 | |
|                     self._process_rx_message()
 | |
|                 # We're done here
 | |
|                 return
 | |
|             else:
 | |
|                 # We've got a new message interrupting the in-progress one
 | |
|                 # Close off existing rx message and continue
 | |
|                 self._process_rx_message()
 | |
| 
 | |
|         self._rx_message = RXMessage(msg_type, msg_name, msg_args,
 | |
|                                      is_multipart=multipart_start, multipart_count=multipart_count)
 | |
|         if not self._rx_message.is_multipart:
 | |
|             self._process_rx_message()
 | |
| 
 | |
|     def _process_rx_message(self):
 | |
|         """
 | |
|         Process a received and parsed message in self._rx_message, 
 | |
|         and dispatches any necessary actions.
 | |
|         Clears self._rx_message after it's done.
 | |
|         """
 | |
|         #print(F"Process rx_msg: {self._rx_message.msg_name}")
 | |
|         #print(F"tx message during processing is {self._tx_message.msg_name}")
 | |
| 
 | |
|         if (self._tx_message is not None) and (self._tx_message.msg_name == self._rx_message.msg_name) and (self._rx_message.msg_type == MessageType.COMMENT):
 | |
|             # _tx_message only hangs around if it's waiting for response.
 | |
|             # can't just assume next received message is a response, as the other
 | |
|             # device might be sending something else in the meantime, so check for name.
 | |
|             # Responses are always comments.
 | |
|             self._tx_message._response = self._rx_message
 | |
|             self._tx_message._responded.set()
 | |
|             self._tx_message = None
 | |
|         else:
 | |
|             pass
 | |
|             # TODO - Handle other type of received message and dispatch actions here - callbacks?
 | |
|         self._rx_message = None
 | |
| 
 | |
|     def _handle_serial_port(self):
 | |
|         # If there's bytes, read them and deal with them. The underlying port read locks
 | |
|         # the GIL, so use non-blocking mode.
 | |
|         if self.port.in_waiting > 0:
 | |
|             new_bytes = self.port.read(self.port.in_waiting)
 | |
|             self._rx_string = self._rx_string + new_bytes.decode('utf-8')
 | |
|             # Find and process message frames
 | |
|             while self._find_msg_frame():
 | |
|                 pass
 | |
|             # Loop back to check for more RX characters (the priority)
 | |
|             return
 | |
| 
 | |
|         if self._rx_message is not None:
 | |
|             if (time.time()-self._last_rx_time) >= self._rx_multipart_timeout:
 | |
|                 # Been too long waiting for a multipart message part, timeout and process
 | |
|                 # so the main program can use the message
 | |
|                 self._process_rx_message()
 | |
| 
 | |
|         if self._tx_message is not None:
 | |
|             # Wait for current tx_message to time out
 | |
|             if (time.time()-self._tx_sent_time) >= self._tx_message._response_timeout:
 | |
|                 # Timeout the request
 | |
|                 self._tx_message = None
 | |
|             else:
 | |
|                 time.sleep(self.loop_delay)
 | |
|         else:
 | |
|             # Try and get a new message to send
 | |
|             try:
 | |
|                 self._tx_message = self._tx_message_queue.get(
 | |
|                     block=True, timeout=self.loop_delay)
 | |
|                 # Only gets here if current_tx_message is actually set
 | |
|                 self._send_message()
 | |
|             except queue.Empty:
 | |
|                 pass
 | |
| 
 | |
|     def _serial_comm_thread(self):
 | |
|         while True:
 | |
|             # Actual wait is on either the non-empty queue or a serial character to parse
 | |
|             # Serial comms is not synchronous, so need to be available to recieve characters
 | |
|             # at any point
 | |
|             try:
 | |
|                 self.port.open()
 | |
|                 while True:
 | |
|                     self._handle_serial_port()
 | |
|             except serial.SerialException:
 | |
|                 time.sleep(1)
 | |
|                 # If there's a serialexception, try to reopen the port
 |