You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/agent/core.py

364 lines
14 KiB

"""
Core shepherd module, tying together main service functionality. Provides main CLI.
"""
from pathlib import Path
from copy import deepcopy
from datetime import datetime
import logging
import os
from configspec import *
from . import plugin
from . import control
from . import tasks
log = logging.getLogger("shepherd.agent")
core_interface = plugin.PluginInterface()
confspec = ConfigSpecification()
# Relative pathnames here are all relative to "root_dir". `root_dir` itself is relative to
# the directory the default config is loaded from
confspec.add_specs({
"name": StringSpec(helptext="Identifying name for this device"),
})
confspec.add_specs(optional=True, spec_dict={
"root_dir":
(StringSpec(helptext="Operating directory for shepherd to place working files."
" Relative to the directory containing the default config file."),
"./"),
"custom_config_path":
StringSpec(helptext="Path to custom config layer TOML file."),
"compiled_config_path":
(StringSpec(helptext="Path to custom file Shepherd will generate to show compiled"
" config that was used and any errors in validation."),
"compiled-config.toml"),
"plugin_dir":
(StringSpec(helptext="Optional directory for Shepherd to look for plugins in."),
"./shepherd-plugins")
})
core_interface.register_confspec(confspec)
# Allows plugins to add delay for system time to stabilise
core_interface.register_hook("wait_for_stable_time")
# Allow other modules to add to the core interface (confspec, hooks, interface functions)
# Having modules modify a confspec after it's registered here is a bit of a hack.
tasks.register_on(core_interface)
control.register_on(core_interface)
@ plugin.plugin_class
class Agent():
"""
Holds the main state required to run Shepherd Agent
"""
def __init__(self, default_config_path, use_custom_config=True, control_enabled=False,
new_device_mode=False):
"""
Load in the Shepherd Agent config and associated plugins.
Args:
default_config_path: The path to the default config file
use_custom_config: Set False to disable the local custom config layer
control_enabled: Set False to disable Shepherd Control remote management
(including any cached Control config layer)
new_device_mode: Set True to clear out any cached state and trigger new generation
of ID, as if it were being run on a fresh system.
"""
# Make sure the plugin system uses this instance rather making its own
core_interface._plugin_obj = self
# The config defined by the device (everything before the Control layer)
self.local_config = None
# The config actually being used
self.applied_config = None
# Split the applied_config up into core and plugins
self.core_config = None
self.interface_functions = None
self.plugin_interfaces = None
self.restart_args = [default_config_path,
use_custom_config, control_enabled, new_device_mode]
self.control_enabled = control_enabled
# Compile the config layers
confman = ConfigManager()
# Pre-seed confman with core confspec to bootstrap 'plugin_dir'.
# The plugin load system will get it from the 'shepherd' plugin interface later, but we
# need the 'plugin_dir' before that.
confman.add_confspec("shepherd", core_interface.confspec)
compile_local_config(confman, default_config_path, use_custom_config)
self.local_config = deepcopy(confman.get_config_bundles())
local_core_conf = confman.get_config_bundle('shepherd')
# Check for new device mode
if new_device_mode or check_new_device_file(local_core_conf["custom_config_path"]):
log.info("'new device' mode enabled, clearing old state...")
control.generate_device_identity(local_core_conf["root_dir"])
control.clear_cached_config(local_core_conf["root_dir"])
if local_core_conf["control"] is None:
self.control_enabled = False
log.warning("Shepherd control config section not present. Will not attempt to"
" connect to Shepherd Control server.")
if self.control_enabled:
compile_remote_config(confman)
else:
log.info("Shepherd Control config layer disabled")
self.applied_config = confman.get_config_bundles()
self.core_config = confman.get_config_bundle('shepherd')
loaded_plugin_names = list(self.applied_config.keys())
loaded_plugin_names.remove('shepherd')
if len(loaded_plugin_names) == 0:
loaded_plugin_names.append("--none--")
log.info(F"Loaded plugins: {', '.join(loaded_plugin_names)}")
log.debug("Compiled config: %s", confman.root_config)
if self.core_config["compiled_config_path"]:
message = F"Compiled Shepherd config at {datetime.now()}"
confman.dump_to_file(self.core_config["compiled_config_path"], message=message)
log.info(F"Saved compiled config to {self.core_config['compiled_config_path']}")
@ plugin.plugin_function
def root_dir(self):
return self.core_config["root_dir"]
@ plugin.plugin_function
def device_name(self):
return self.core_config["name"]
def restart(self):
pass
def start(self):
# We don't worry about the plugin dir here, or 'shepherd' being included, as they should
# already all be loaded and cached.
self.plugin_interfaces = plugin.init_plugins(self.applied_config)
# After this point, plugins may already have their own threads running if they created
# them during init
self.interface_functions = core_interface.plugins
cmd_runner = control.CommandRunner(self.interface_functions)
core_update_state = control.CoreUpdateState(cmd_runner.cmd_reader,
cmd_runner.cmd_result_writer)
core_update_state.set_static_state(
self.local_config, self.applied_config, core_interface.confspec)
plugin_update_states = {name: iface._update_state
for name, iface in self.plugin_interfaces.items()}
if self.control_enabled:
control.start_control(self.core_config["control"], self.root_dir(),
core_update_state, plugin_update_states)
# Need somewhere to eventually pass in the hooks Tasks will need for the lowpower stuff,
# probably just another init_plugins arg.
# TODO Collect plugin tasks
task_session = tasks.init_tasks(self.core_config['session'], self.root_dir(),
[], self.applied_config, self.interface_functions)
# TODO Any time stabilisation or waiting for Control
tasks.start_tasks(core_interface, task_session)
# tasks.init_tasks(self.core_config) # seperate tasks.start?
# plugin.start() # Run the plugin `.run` hooks in seperate threads
# scheduler.restore_jobs()
def check_new_device_file(custom_config_path):
if not custom_config_path:
return False
trigger_path = Path(Path(custom_config_path).parent, 'shepherd.new')
if trigger_path.exists():
trigger_path.unlink()
log.info("'shepherd.new' file detected, removing file and"
" triggering 'new device' mode")
return True
return False
def compile_local_config(confman, default_config_path, use_custom_config):
"""
Load the default config and optionally try to overlay the custom config layer.
As part of this, load the required plugins into cache (required to validate their config).
"""
# ====Default Local Config Layer====
# This must validate to continue.
default_config_path = Path(default_config_path).expanduser().resolve()
try:
load_config_layer_and_plugins(confman, default_config_path)
log.info(F"Loaded default config layer from {default_config_path}")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error(F"Failed to load default config from {default_config_path}."
F" {chr(10).join(e.args)}")
else:
log.error(F"Failed to load default config from {default_config_path}", exc_info=True)
raise
# Resolve and freeze local install paths that shouldn't be changed from default config
core_conf = confman.get_config_bundle("shepherd")
resolve_core_conf_paths(core_conf, default_config_path.parent)
confman.freeze_value("shepherd", "root_dir")
confman.freeze_value("shepherd", "plugin_dir")
confman.freeze_value("shepherd", "custom_config_path")
confman.freeze_value("shepherd", "compiled_config_path")
# Pull out custom config path and save current good config
custom_config_path = core_conf["custom_config_path"]
confman.save_fallback()
if not core_conf["plugin_dir"]:
log.warning("Custom plugin path is empty, won't load custom plugins")
# ====Custom Local Config Layer====
# If this fails, fallback to default config
if not use_custom_config:
log.info("Custom config layer disabled")
return
if not custom_config_path:
log.warning("Custom config path is empty, skipping custom config layer")
return
try:
load_config_layer_and_plugins(confman, custom_config_path)
log.info(F"Loaded custom config layer from {custom_config_path}")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error(F"Failed to load custom config layer from {custom_config_path}."
F" {e.args[0]}")
else:
log.error(F"Failed to load custom config layer from {custom_config_path}.",
exc_info=True)
log.warning("Falling back to default config.")
confman.fallback()
def compile_remote_config(confman):
"""
Attempt to load and apply the Shepherd Control config layer (cached from prior communication,
Control hasn't actually started up yet). Falls back to previous config if it fails.
As part of this, load the required plugins into cache (required to validate their config).
"""
# ====Control Remote Config Layer====
# Freeze Shepherd Control related config.
confman.freeze_value("shepherd", "control", "server")
confman.freeze_value("shepherd", "control", "intro_key")
# Save current good local config
confman.save_fallback()
core_conf = confman.get_config_bundle("shepherd")
try:
control_config = control.get_cached_config(core_conf["root_dir"])
try:
load_config_layer_and_plugins(confman, control_config)
log.info("Loaded cached Shepherd Control config layer")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error("Failed to load cached Shepherd Control config layer."
F" {e.args[0]}")
else:
log.error("Failed to load cached Shepherd Control config layer.",
exc_info=True)
log.warning("Falling back to local config.")
confman.fallback()
except Exception:
log.warning("No cached Shepherd Control config layer available.")
def resolve_core_conf_paths(core_conf, relative_dir):
"""
Set the cwd to ``root_dir`` and resolve other core config paths relative to that.
``root_dir`` itself will resolve relative to ``relative_dir``, intended to be the config
file directory.
Also expands out any "~" user characters. If paths are empty, leaves them as is, rather than
the default pathlib behaviour of resolving to the current directory
"""
os.chdir(relative_dir)
core_conf["root_dir"] = str(Path(core_conf["root_dir"]).expanduser().resolve())
try:
os.chdir(core_conf["root_dir"])
except FileNotFoundError:
raise FileNotFoundError(F"Shepherd root operating directory '{core_conf['root_dir']}'"
F" does not exist")
if core_conf["plugin_dir"]:
core_conf["plugin_dir"] = str(Path(core_conf["plugin_dir"]).expanduser().resolve())
if core_conf["custom_config_path"]:
core_conf["custom_config_path"] = str(
Path(core_conf["custom_config_path"]).expanduser().resolve())
if core_conf["compiled_config_path"]:
core_conf["compiled_config_path"] = str(
Path(core_conf["compiled_config_path"]).expanduser().resolve())
def load_config_layer_and_plugins(confman: ConfigManager, config_source):
"""
Load a config layer, find the necessary plugin classes, then validate it.
If this succeeds, the returned dict of plugin classes will directly match
the bundle names in the config manager.
"""
# Load in config layer
confman.load_overlay(config_source)
# Get the core config so we can find the plugin directory
core_config = confman.get_config_bundle("shepherd")
plugin_dir = core_config["plugin_dir"]
# List other bundle names to get plugins we need to load
plugin_names = confman.get_bundle_names()
# Load plugins to get their config specifications
plugin_interfaces = {name: plugin.load_plugin(name, plugin_dir) for name in plugin_names}
for plugin_name, plugin_interface in plugin_interfaces.items():
confman.add_confspec(plugin_name, plugin_interface.confspec)
# Validate all plugin configs
confman.validate_bundles()
"""
Shim to allow the Agent to restart itself without involving the actual CLI
"""
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Shepherd core restart shim. For general use,"
" use the main Shepherd CLI instead.")
parser.add_argument('default_config_path', type=str)
parser.add_argument('use_custom_config', type=bool)
parser.add_argument('control_enabled', type=bool)
parser.add_argument('new_device_mode', type=bool)
args = parser.parse_args()
agent = Agent(args.default_config_path, args.use_custom_config,
args.control_enabled, args.new_device_mode)
agent.start()