""" Core shepherd module, tying together main service functionality. Provides main CLI. """ from pathlib import Path from copy import deepcopy from datetime import datetime import logging import os from configspec import * from . import plugin from . import control from . import tasks log = logging.getLogger("shepherd.agent") core_interface = plugin.PluginInterface() confspec = ConfigSpecification() # Relative pathnames here are all relative to "root_dir". `root_dir` itself is relative to # the directory the default config is loaded from confspec.add_specs({ "name": StringSpec(helptext="Identifying name for this device"), }) confspec.add_specs(optional=True, spec_dict={ "root_dir": (StringSpec(helptext="Operating directory for shepherd to place working files." " Relative to the directory containing the default config file."), "./"), "custom_config_path": StringSpec(helptext="Path to custom config layer TOML file."), "compiled_config_path": (StringSpec(helptext="Path to custom file Shepherd will generate to show compiled" " config that was used and any errors in validation."), "compiled-config.toml"), "plugin_dir": (StringSpec(helptext="Optional directory for Shepherd to look for plugins in."), "./shepherd-plugins") }) core_interface.register_confspec(confspec) # Allows plugins to add delay for system time to stabilise core_interface.register_hook("wait_for_stable_time") # Allow other modules to add to the core interface (confspec, hooks, interface functions) # Having modules modify a confspec after it's registered here is a bit of a hack. tasks.register_on(core_interface) control.register_on(core_interface) @ plugin.plugin_class class Agent(): """ Holds the main state required to run Shepherd Agent """ def __init__(self): # Make sure the plugin system uses this instance rather making its own core_interface._plugin_obj = self # The config defined by the device (everything before the Control layer) self.local_config = None # The config actually being used self.applied_config = None # Split the applied_config up into core and plugins self.core_config = None self.interface_functions = None self.control_enabled = None self.plugin_interfaces = None self.restart_args = None @ plugin.plugin_function def root_dir(self): return self.core_config["root_dir"] @ plugin.plugin_function def device_name(self): return self.core_config["name"] def load(self, default_config_path, use_custom_config=True, control_enabled=False, new_device_mode=False): """ Load in the Shepherd Agent config and associated plugins. Args: default_config_path: The path to the default config file use_custom_config: Set False to disable the local custom config layer control_enabled: Set False to disable Shepherd Control remote management (including any cached Control config layer) new_device_mode: Set True to clear out any cached state and trigger new generation of ID, as if it were being run on a fresh system. """ self.restart_args = [default_config_path, use_custom_config, control_enabled, new_device_mode] self.control_enabled = control_enabled # Compile the config layers confman = ConfigManager() # Pre-seed confman with core confspec to bootstrap 'plugin_dir'. This is required even # though 'load_config_layer_and_plugins()' will get the core confspec from the cached # interface, as it needs 'plugin_dir' to list the plugins to load first. confman.add_confspec("shepherd", core_interface.confspec) compile_local_config(confman, default_config_path, use_custom_config) self.local_config = deepcopy(confman.get_config_bundles()) local_core_conf = confman.get_config_bundle('shepherd') # Check for new device mode if new_device_mode or check_new_device_file(local_core_conf["custom_config_path"]): log.info("'new device' mode enabled, clearing old state...") control.generate_device_identity(local_core_conf["root_dir"]) control.clear_cached_config(local_core_conf["root_dir"]) if local_core_conf["control"] is None: self.control_enabled = False log.warning("Shepherd control config section not present. Will not attempt to" " connect to Shepherd Control server.") if self.control_enabled: compile_remote_config(confman) else: log.info("Shepherd Control config layer disabled") self.applied_config = confman.get_config_bundles() self.core_config = confman.get_config_bundle('shepherd') loaded_plugin_names = list(self.applied_config.keys()) loaded_plugin_names.remove('shepherd') if len(loaded_plugin_names) == 0: loaded_plugin_names.append("--none--") log.info(F"Loaded plugins: {', '.join(loaded_plugin_names)}") log.debug("Compiled config: %s", confman.root_config) if self.core_config["compiled_config_path"]: message = F"Compiled Shepherd config at {datetime.now()}" confman.dump_to_file(self.core_config["compiled_config_path"], message=message) log.info(F"Saved compiled config to {self.core_config['compiled_config_path']}") def restart(self): pass def start(self): # We don't worry about the plugin dir here, or 'shepherd' being included, as they should # already all be loaded and cached. self.plugin_interfaces = plugin.init_plugins(self.applied_config) # After this point, plugins may already have their own threads running if they created # them during init self.interface_functions = core_interface.plugins cmd_runner = control.CommandRunner(self.interface_functions) core_update_state = control.CoreUpdateState(cmd_runner.cmd_reader, cmd_runner.cmd_result_writer) core_update_state.set_static_state( self.local_config, self.applied_config, core_interface.confspec) plugin_update_states = {name: iface._update_state for name, iface in self.plugin_interfaces.items()} if self.control_enabled: control.start_control(self.core_config["control"], self.root_dir(), core_update_state, plugin_update_states) # Need somewhere to eventually pass in the hooks Tasks will need for the lowpower stuff, # probably just another init_plugins arg. # TODO Collect plugin tasks task_session = tasks.init_tasks(self.core_config['session'], self.root_dir(), [], self.applied_config, self.interface_functions) # TODO Any time stabilisation or waiting for Control tasks.start_tasks(core_interface, task_session) # tasks.init_tasks(self.core_config) # seperate tasks.start? # plugin.start() # Run the plugin `.run` hooks in seperate threads # scheduler.restore_jobs() def check_new_device_file(custom_config_path): if not custom_config_path: return False trigger_path = Path(Path(custom_config_path).parent, 'shepherd.new') if trigger_path.exists(): trigger_path.unlink() log.info("'shepherd.new' file detected, removing file and" " triggering 'new device' mode") return True return False def compile_local_config(confman, default_config_path, use_custom_config): """ Load the default config and optionally try to overlay the custom config layer. As part of this, load the required plugins into cache (required to validate their config). """ # ====Default Local Config Layer==== # This must validate to continue. default_config_path = Path(default_config_path).expanduser().resolve() try: load_config_layer_and_plugins(confman, default_config_path) log.info(F"Loaded default config layer from {default_config_path}") except Exception as e: if isinstance(e, InvalidConfigError): log.error(F"Failed to load default config from {default_config_path}." F" {chr(10).join(e.args)}") else: log.error(F"Failed to load default config from {default_config_path}", exc_info=True) raise # Resolve and freeze local install paths that shouldn't be changed from default config core_conf = confman.get_config_bundle("shepherd") resolve_core_conf_paths(core_conf, default_config_path.parent) confman.freeze_value("shepherd", "root_dir") confman.freeze_value("shepherd", "plugin_dir") confman.freeze_value("shepherd", "custom_config_path") confman.freeze_value("shepherd", "compiled_config_path") # Pull out custom config path and save current good config custom_config_path = core_conf["custom_config_path"] confman.save_fallback() if not core_conf["plugin_dir"]: log.warning("Custom plugin path is empty, won't load custom plugins") # ====Custom Local Config Layer==== # If this fails, fallback to default config if not use_custom_config: log.info("Custom config layer disabled") return if not custom_config_path: log.warning("Custom config path is empty, skipping custom config layer") return try: load_config_layer_and_plugins(confman, custom_config_path) log.info(F"Loaded custom config layer from {custom_config_path}") except Exception as e: if isinstance(e, InvalidConfigError): log.error(F"Failed to load custom config layer from {custom_config_path}." F" {e.args[0]}") else: log.error(F"Failed to load custom config layer from {custom_config_path}.", exc_info=True) log.warning("Falling back to default config.") confman.fallback() def compile_remote_config(confman): """ Attempt to load and apply the Shepherd Control config layer (cached from prior communication, Control hasn't actually started up yet). Falls back to previous config if it fails. As part of this, load the required plugins into cache (required to validate their config). """ # ====Control Remote Config Layer==== # Freeze Shepherd Control related config. confman.freeze_value("shepherd", "control", "server") confman.freeze_value("shepherd", "control", "intro_key") # Save current good local config confman.save_fallback() core_conf = confman.get_config_bundle("shepherd") try: control_config = control.get_cached_config(core_conf["root_dir"]) try: load_config_layer_and_plugins(confman, control_config) log.info("Loaded cached Shepherd Control config layer") except Exception as e: if isinstance(e, InvalidConfigError): log.error("Failed to load cached Shepherd Control config layer." F" {e.args[0]}") else: log.error("Failed to load cached Shepherd Control config layer.", exc_info=True) log.warning("Falling back to local config.") confman.fallback() except Exception: log.warning("No cached Shepherd Control config layer available.") def resolve_core_conf_paths(core_conf, relative_dir): """ Set the cwd to ``root_dir`` and resolve other core config paths relative to that. ``root_dir`` itself will resolve relative to ``relative_dir``, intended to be the config file directory. Also expands out any "~" user characters. If paths are empty, leaves them as is, rather than the default pathlib behaviour of resolving to the current directory """ os.chdir(relative_dir) core_conf["root_dir"] = str(Path(core_conf["root_dir"]).expanduser().resolve()) try: os.chdir(core_conf["root_dir"]) except FileNotFoundError: raise FileNotFoundError(F"Shepherd root operating directory '{core_conf['root_dir']}'" F" does not exist") if core_conf["plugin_dir"]: core_conf["plugin_dir"] = str(Path(core_conf["plugin_dir"]).expanduser().resolve()) if core_conf["custom_config_path"]: core_conf["custom_config_path"] = str( Path(core_conf["custom_config_path"]).expanduser().resolve()) if core_conf["compiled_config_path"]: core_conf["compiled_config_path"] = str( Path(core_conf["compiled_config_path"]).expanduser().resolve()) def load_config_layer_and_plugins(confman: ConfigManager, config_source): """ Load a config layer, find the necessary plugin classes, then validate it. If this succeeds, the returned dict of plugin classes will directly match the bundle names in the config manager. """ # Load in config layer confman.load_overlay(config_source) # Get the core config so we can find the plugin directory core_config = confman.get_config_bundle("shepherd") plugin_dir = core_config["plugin_dir"] # List other bundle names to get plugins we need to load plugin_names = confman.get_bundle_names() # Load plugins to get their config specifications plugin_interfaces = {name: plugin.load_plugin(name, plugin_dir) for name in plugin_names} for plugin_name, plugin_interface in plugin_interfaces.items(): confman.add_confspec(plugin_name, plugin_interface.confspec) # Validate all plugin configs confman.validate_bundles() """ Shim to allow the Agent to restart itself without involving the actual CLI """ if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description="Shepherd core restart shim. For general use," " use the main Shepherd CLI instead.") parser.add_argument('default_config_path', type=str) parser.add_argument('use_custom_config', type=bool) parser.add_argument('control_enabled', type=bool) parser.add_argument('new_device_mode', type=bool) args = parser.parse_args() agent = Agent() agent.load(args.default_config_path, args.use_custom_config, args.control_enabled, args.new_device_mode) agent.start()