From 3b499a8f5332431cf27afd83f5a18aca776d2eaa Mon Sep 17 00:00:00 2001 From: novirium Date: Tue, 24 Mar 2020 15:43:38 +0800 Subject: [PATCH] Add command runner to control --- shepherd/agent/control.py | 78 +++++++++++++++++++++++++++++++++------ tests/test_control.py | 71 +++++++++++++++++++++++++++++++---- 2 files changed, 131 insertions(+), 18 deletions(-) diff --git a/shepherd/agent/control.py b/shepherd/agent/control.py index 680b233..83e6c19 100644 --- a/shepherd/agent/control.py +++ b/shepherd/agent/control.py @@ -1,5 +1,6 @@ import threading import secrets +from types import SimpleNamespace from pathlib import Path from urllib.parse import urlparse, urlunparse, urljoin from hashlib import blake2b @@ -10,6 +11,15 @@ import requests from configspec import * import statesman + +# Namespace of types intended for server-side use. +def get_export(): + from . import plugin + export = SimpleNamespace() + export.InterfaceCall = plugin.InterfaceCall + return export + + log = logging.getLogger("shepherd.agent.control") _control_update_required = threading.Condition() @@ -24,7 +34,7 @@ def control_confspec(): """ Returns the control config specification """ - confspec = ConfigSpecification(optional=True) + confspec = ConfigSpecification() confspec.add_spec("server", StringSpec()) confspec.add_spec("intro_key", StringSpec()) @@ -32,31 +42,73 @@ def control_confspec(): class CoreUpdateState(): - def __init__(self, local_config, applied_config): - self.topic_bundle = statesman.TopicBundle() - - self.topic_bundle.add_writer('status', statesman.StateWriter()) - self.topic_bundle.add_writer('config-spec', statesman.StateWriter()) - self.topic_bundle.add_writer('device-config', statesman.StateWriter()) - self.topic_bundle.add_writer('applied-config', statesman.StateWriter()) + def __init__(self, cmd_reader, cmd_result_writer): + """ + Control update handler for the `/update` core endpoint. + """ + self.topic_bundle = statesman.TopicBundle({ + 'status': statesman.StateWriter(), + 'config-spec': statesman.StateWriter(), + 'device-config': statesman.StateWriter(), + 'applied-config': statesman.StateWriter(), + 'control-commands': cmd_reader, + 'command-results': cmd_result_writer}) self.topic_bundle.set_update_required_callback(_update_required_callback) + def set_static_state(self, local_config, applied_config, confspec): # These should all effectively be static self.topic_bundle['device-config'].set_state(local_config) self.topic_bundle['applied-config'].set_state(applied_config) + self.topic_bundle['config-spec'].set_state(confspec) def set_status(self, status_dict): self.topic_bundle['status'].set_state(status_dict) +class CommandRunner(): + def __init__(self, interface_functions): + self.cmd_reader = statesman.SequenceReader( + new_message_callback=self.on_new_command_message) + self.cmd_result_writer = statesman.SequenceWriter() + self._functions = interface_functions + self.current_commands = {} + + def on_new_command_message(self, message): + # This should be a single list, where the first value is the command ID and the second + # value is a plugin.FunctionCall + commandID = message[0] + command_call = message[1] + + command_thread = threading.Thread(target=self._process_command, + args=(commandID, command_call)) + command_thread.start() + + def _process_command(self, commandID, command_call): + if commandID in self.current_commands: + raise ValueError(F"Already running a command with ID {commandID}") + self.current_commands[commandID] = threading.current_thread() + + try: + command_call.resolve(self._functions) + result = command_call.call() + + self.cmd_result_writer.add_message([commandID, result]) + finally: + self.current_commands.pop(commandID) + + class PluginUpdateState(): def __init__(self): self.topic_bundle = statesman.TopicBundle() # config-spec should be static, but isn't known yet when this is created - self.topic_bundle.add_writer('status', statesman.StateWriter()) - self.topic_bundle.add_writer('config-spec', statesman.StateWriter()) + self.topic_bundle.add('status', statesman.StateWriter()) + self.topic_bundle.add('config-spec', statesman.StateWriter()) + self.topic_bundle.add('command-spec', statesman.StateWriter()) + # Why is config split out into plugins? Just like the device config and applied config, + # it's only loaded once at the start. Is this purely because it's easy to get at from the + # PluginInterface where this object is created? self.topic_bundle.set_update_required_callback(_update_required_callback) @@ -66,6 +118,9 @@ class PluginUpdateState(): def set_confspec(self, config_spec): self.topic_bundle['config-spec'].set_state(config_spec) + def set_commandspec(self, command_spec): + self.topic_bundle['command-spec'].set_state(command_spec) + def clean_https_url(dirty_url): """ @@ -143,7 +198,7 @@ def stop(): log.info("Control thread stop requested.") -def init_control(config, root_dir, core_update_state, plugin_update_states): +def start_control(config, root_dir, core_update_state, plugin_update_states): """ Start the Control update thread and initialise the Shepherd Control systems. """ @@ -217,6 +272,7 @@ def _control_update_loop(config, root_dir, core_update_state, plugin_update_stat # Breaking here is a clean way of killing any delay and allowing a final update before # the thread ends. log.warning("Control thread stopping...") + _stop_event.clear() break delay = update_rate_limiter.new_event(time.monotonic()) diff --git a/tests/test_control.py b/tests/test_control.py index 5cd9c6e..dc73748 100644 --- a/tests/test_control.py +++ b/tests/test_control.py @@ -3,11 +3,14 @@ import secrets from base64 import b64encode import json import logging +import time import pytest import responses import statesman +from collections import namedtuple from shepherd.agent import control +from shepherd.agent import plugin def test_device_id(monkeypatch, tmpdir): @@ -54,11 +57,13 @@ def test_control_thread(control_config, tmpdir, caplog): responses.add(responses.POST, 'https://api.shepherd.test/agent/pluginupdate/plugin_B', json={}) core_update_state = control.CoreUpdateState( - {'the_local_config': 'val'}, {'the_applied_config': 'val'}) + statesman.SequenceReader(), statesman.SequenceWriter()) + core_update_state.set_static_state({'the_local_config': 'val'}, { + 'the_applied_config': 'val'}, {}) plugin_update_states = {'plugin_A': control.PluginUpdateState(), 'plugin_B': control.PluginUpdateState()} - control_thread = control.init_control( + control_thread = control.start_control( control_config, tmpdir, core_update_state, plugin_update_states) control.stop() control_thread.join() @@ -83,10 +88,12 @@ def test_control(control_config, tmpdir, caplog, monkeypatch): core_topic_bundle = statesman.TopicBundle() - core_topic_bundle.add_reader('status', statesman.StateReader()) - core_topic_bundle.add_reader('config-spec', statesman.StateReader()) - core_topic_bundle.add_reader('device-config', statesman.StateReader()) - core_topic_bundle.add_reader('applied-config', statesman.StateReader()) + core_topic_bundle.add('status', statesman.StateReader()) + core_topic_bundle.add('config-spec', statesman.StateReader()) + core_topic_bundle.add('device-config', statesman.StateReader()) + core_topic_bundle.add('applied-config', statesman.StateReader()) + core_topic_bundle.add('control-commands', statesman.SequenceWriter()) + core_topic_bundle.add('command-results', statesman.SequenceReader()) core_callback_count = 0 @@ -115,7 +122,9 @@ def test_control(control_config, tmpdir, caplog, monkeypatch): responses.add(responses.POST, 'https://api.shepherd.test/agent/pluginupdate/plugin_B', json={}) core_update_state = control.CoreUpdateState( - {'the_local_config': 'val'}, {'the_applied_config': 'val'}) + statesman.SequenceReader(), statesman.SequenceWriter()) + core_update_state.set_static_state({'the_local_config': 'val'}, { + 'the_applied_config': 'val'}, {}) plugin_update_states = {'plugin_A': control.PluginUpdateState(), 'plugin_B': control.PluginUpdateState()} plugin_update_states['plugin_A'].set_status({"status1": '1'}) @@ -133,3 +142,51 @@ def test_control(control_config, tmpdir, caplog, monkeypatch): # Check there were no connection exceptions for record in caplog.records: assert record.levelno <= logging.WARNING + + +def test_command_runner(): + func_a_was_called = False + + def func_a(): + nonlocal func_a_was_called + func_a_was_called = True + test_function_a = plugin.InterfaceFunction(func_a, 'function_a') + + func_b_was_called = False + + def func_b(arg1): + nonlocal func_b_was_called + func_b_was_called = True + return arg1+1 + test_function_b = plugin.InterfaceFunction(func_b, 'function_b') + + func_tuple = namedtuple('test_functions', ('function_a', 'function_b') + )(test_function_a, test_function_b) + if_functions = {'test_plugin': func_tuple} + cmd_runner = control.CommandRunner(if_functions) + + assert not func_a_was_called + cmd_runner._process_command(10, plugin.InterfaceCall('test_plugin', 'function_a', None)) + assert func_a_was_called + + assert not func_b_was_called + cmd_runner._process_command(12, plugin.InterfaceCall('test_plugin', 'function_b', {'arg1': 5})) + assert func_b_was_called + # Get most recent writer message + wr_msg = list(cmd_runner.cmd_result_writer._messages.values())[-1] + assert wr_msg == [12, 6] + + func_b_was_called = False + cmd_runner.on_new_command_message( + [15, plugin.InterfaceCall('test_plugin', 'function_b', {'arg1': 8})]) + while 15 in cmd_runner.current_commands: + time.sleep(0.01) + + assert func_b_was_called + wr_msg = list(cmd_runner.cmd_result_writer._messages.values())[-1] + assert wr_msg == [15, 9] + + + # Control/Plugin integration tests + + # Test command_runner with actual plugin