You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/agent/core.py

347 lines
14 KiB

"""
Core shepherd module, tying together main service functionality. Provides main CLI.
"""
import os
import sys
from pathlib import Path
import glob
from datetime import datetime
import logging
import chromalog
import pkg_resources
import click
from configspec import *
from . import scheduler
from . import plugin
from . import control
chromalog.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
log = logging.getLogger("shepherd.agent")
@click.group(invoke_without_command=True)
@click.option('-c', '--config', 'default_config_path', type=click.Path(),
help="Shepherd config TOML file to be used as default config layer."
" Overrides default './shepherd*.toml' search")
@click.option('-l', '--local', 'local_operation', is_flag=True,
help="Only use the local config layers (default and custom), and disable all"
" Shepherd Control remote features")
@click.option('-d', '--default-config-only', 'only_default_layer', is_flag=True,
help="Ignore the custom config layer (still uses the Control config above that)")
@click.pass_context
def cli(ctx, default_config_path, local_operation, only_default_layer):
"""
Core service. If default config file is not provided with '-c' option, the first filename
in the current working directory beginning with "shepherd" and
ending with ".toml" will be used.
"""
version_text = pkg_resources.get_distribution("shepherd")
log.info(F"Shepherd Agent [{version_text}]")
if ctx.invoked_subcommand == "template":
return
if not default_config_path:
default_config_path = sorted(glob.glob("./shepherd*.toml"))[:1]
if default_config_path:
default_config_path = default_config_path[0]
log.info(F"No default config file provided, using {default_config_path}")
else:
log.error("No default config file provided, and no 'shepherd*.toml' could be"
" found in the current directory")
sys.exit(1)
layers_disabled = []
if local_operation:
layers_disabled.append("control")
log.info("Running in local only mode")
if only_default_layer:
layers_disabled.append("custom")
confman = ConfigManager()
compile_config(confman, default_config_path, layers_disabled)
plugin_configs = confman.get_config_bundles()
del plugin_configs["shepherd"]
core_config = confman.get_config_bundle("shepherd")
# control.init_control(core_config, plugin_configs)
scheduler.init_scheduler(core_config)
plugin.init_plugins(plugin_configs, core_config)
# scheduler.restore_jobs()
print(str(datetime.now()))
if ctx.invoked_subcommand is not None:
return
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
@cli.command()
@click.option('-d', '--plugin-dir', type=click.Path(),
help="Override the configured directory to search for plugin modules, in addition to"
" built in Shepherd plugins and the global import path."
" Supplying the option empty will use the current directory.")
def test():
print("test!")
@cli.command()
@click.argument('plugin_name', required=False)
@click.option('-a', '--include-all', is_flag=True,
help="Include all optional fields in the template")
@click.option('-c', '--config', 'config_path', type=click.Path(),
help="Path to append or create config tempalate")
@click.option('-d', '--plugin-dir', type=click.Path(),
help="Directory to search for plugin modules, in addition to built in Shepherd"
" plugins and the global import path. Defaults to current directory.")
@click.pass_context
def template(ctx, plugin_name, include_all, config_path, plugin_dir):
"""
Generate a template config TOML file for PLUGIN_NAME, or for the Shepherd core if
PLUGIN_NAME is not provided.
If config path is provided ("-c"), append to that file (if it exists) or write to
a new file (if it doesn't yet exist).
"""
if not plugin_dir:
plugin_dir = Path.cwd()
confspec = ConfigSpecification()
if (not plugin_name) or (plugin_name == "shepherd"):
plugin_name = "shepherd"
confspec = core_confspec()
else:
try:
plugin_interface = plugin.load_plugin(plugin_name, plugin_dir)
except plugin.PluginLoadError as e:
log.error(e.args[0])
sys.exit(1)
confspec = plugin_interface.confspec
template_dict = confspec.get_template(include_all)
template_toml = toml.dumps({plugin_name: template_dict})
log.info(F"Config template for [{plugin_name}]: \n\n"+template_toml)
if not config_path:
# reuse parent "-c" for convenience
config_path = ctx.parent.params["default_config_path"]
if not config_path:
return
if Path(config_path).is_file():
try:
existing_config = toml.load(config_path)
except Exception:
click.confirm(
F"File {config_path} already exists and is not a valid TOML file. Overwrite?",
default=True, abort=True)
click.echo(F"Writing [{plugin_name}] template to {config_path}")
with open(config_path, 'w+') as f:
f.write(template_toml)
else:
if plugin_name in existing_config:
click.confirm(F"Overwrite [{plugin_name}] section in {config_path}?",
default=True, abort=True)
click.echo(F"Overwriting [{plugin_name}] section in {config_path}")
else:
click.confirm(F"Add [{plugin_name}] section to {config_path}?",
default=True, abort=True)
click.echo(F"Adding [{plugin_name}] section to {config_path}")
existing_config[plugin_name] = template_dict
with open(config_path, 'w+') as f:
f.write(toml.dumps(existing_config))
else:
click.echo(F"Writing [{plugin_name}] template to {config_path}")
with open(config_path, 'w+') as f:
f.write(template_toml)
def compile_config(confman, default_config_path, layers_disabled):
"""
Run through the process of assembling the various config layers, falling back to working
ones where necessary. As part of this, loads in the required plugins.
"""
confman.add_confspec("shepherd", core_confspec())
# ====Default Local Config Layer====
# This must validate to continue.
default_config_path = Path(default_config_path).expanduser()
try:
load_config_layer_and_plugins(confman, default_config_path)
log.info(F"Loaded default config layer from {default_config_path}")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error(F"Failed to load default config from {default_config_path}. {e.args[0]}")
else:
log.error(F"Failed to load default config from {default_config_path}", exc_info=True)
sys.exit(1)
# Resolve and freeze local install paths that shouldn't be changed from default config
core_conf = confman.get_config_bundle("shepherd")
resolve_core_conf_paths(core_conf)
confman.freeze_value("shepherd", "root_dir")
confman.freeze_value("shepherd", "plugin_dir")
confman.freeze_value("shepherd", "custom_config_path")
confman.freeze_value("shepherd", "generated_config_path")
# Pull out custom config path and save current good config
custom_config_path = core_conf["custom_config_path"]
confman.save_fallback()
if not core_conf["plugin_dir"]:
log.warn("Custom plugin path is empty, won't load custom plugins")
# ====Custom Local Config Layer====
# If this fails, maintain default config but continue on to Control layer
if "custom" in layers_disabled:
log.info("Custom config layer disabled")
elif not custom_config_path:
log.warning("Custom config path is empty, skipping custom config layer")
else:
try:
load_config_layer_and_plugins(confman, custom_config_path)
log.info(F"Loaded custom config layer from {custom_config_path}")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error(F"Failed to load custom config layer from {custom_config_path}."
F" {e.args[0]}")
else:
log.error(F"Failed to load custom config layer from {custom_config_path}.",
exc_info=True)
log.warning("Falling back to default config.")
confman.fallback()
# Freeze Shepherd Control related config.
core_conf = confman.get_config_bundle("shepherd")
resolve_core_conf_paths(core_conf)
confman.freeze_value("shepherd", "control_server")
confman.freeze_value("shepherd", "control_api_key")
# Save current good config
confman.save_fallback()
# ====Control Remote Config Layer====
# If this fails, maintain current local config.
if "control" not in layers_disabled:
try:
control_config = control.get_config(core_conf["root_dir"])
try:
load_config_layer_and_plugins(confman, control_config)
log.info(F"Loaded cached Shepherd Control config layer")
except Exception as e:
if isinstance(e, InvalidConfigError):
log.error(F"Failed to load cached Shepherd Control config layer."
F" {e.args[0]}")
else:
log.error(F"Failed to load cached Shepherd Control config layer.",
exc_info=True)
log.warning("Falling back to local config.")
confman.fallback()
except Exception:
log.warning("No cached Shepherd Control config layer available.")
else:
log.info("Shepherd Control config layer disabled")
log.debug("Compiled config: %s", confman.root_config)
confman.dump_to_file(core_conf["generated_config_path"])
# Relative pathnames here are all relative to "root_dir"
def core_confspec():
"""
Returns the core config specification
"""
confspec = ConfigSpecification()
confspec.add_specs([
("name", StringSpec(helptext="Identifying name for this device")),
("hostname", StringSpec(default="", optional=True, helptext="If set, changes the system"
" hostname")),
("plugin_dir", StringSpec(default="./shepherd-plugins", optional=True,
helptext="Optional directory for Shepherd to look for"
" plugins in.")),
("root_dir", StringSpec(default="./shepherd", optional=False,
helptext="Operating directory for shepherd to place"
" working files.")),
("custom_config_path", StringSpec(optional=True, helptext="Path to custom config"
" layer TOML file.")),
("generated_config_path", StringSpec(default="shepherd-generated.toml", optional=True,
helptext="Path to custom file Shepherd will generate"
" to show compiled config that was used and any"
" errors in validation."))
])
confspec.add_spec("control_server", StringSpec())
confspec.add_spec("control_api_key", StringSpec())
return confspec
def resolve_core_conf_paths(core_conf):
"""
Set the cwd to ``root_dir`` and resolve other core config paths relative to that.
``root_dir`` itself will resolve relative to ``relative_dir``, intended to be the config
file directory.
Also expands out any "~" user characters. If paths are empty, leaves them as is, rather than
the default pathlib behaviour of resolving to the current directory
"""
os.chdir(relative_dir)
core_conf["root_dir"] = str(Path(core_conf["root_dir"]).expanduser().resolve())
try:
os.chdir(core_conf["root_dir"])
except FileNotFoundError:
raise FileNotFoundError(F"Shepherd root operating directory '{core_conf['root_dir']}'"
F" does not exist")
if core_conf["plugin_dir"]:
core_conf["plugin_dir"] = str(Path(core_conf["plugin_dir"]).expanduser().resolve())
if core_conf["custom_config_path"]:
core_conf["custom_config_path"] = str(
Path(core_conf["custom_config_path"]).expanduser().resolve())
core_conf["generated_config_path"] = str(
Path(core_conf["generated_config_path"]).expanduser().resolve())
def load_config_layer_and_plugins(confman: ConfigManager, config_source):
"""
Load a config layer, find the necessary plugin classes, then validate it.
If this succeeds, the returned dict of plugin classes will directly match
the bundle names in the config manager.
"""
# Load in config layer
confman.load_overlay(config_source)
# Get the core config so we can find the plugin directory
core_config = confman.get_config_bundle("shepherd")
plugin_dir = core_config["plugin_dir"]
# List other bundle names to get plugins we need to load
plugin_names = confman.get_bundle_names()
plugin_names.remove("shepherd")
# Load plugins to get their config specifications
plugin_interfaces = {name: plugin.load_plugin(name, plugin_dir) for name in plugin_names}
for plugin_name, plugin_interface in plugin_interfaces.items():
confman.add_confspec(plugin_name, plugin_interface.confspec)
# Validate all plugin configs
confman.validate_bundles()