""" Core shepherd module, tying together main service functionality. Provides main CLI. """ import os import sys from pathlib import Path import glob from datetime import datetime import logging import chromalog import pkg_resources import click from configspec import * from . import scheduler from . import plugin from . import control chromalog.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) log = logging.getLogger("shepherd-agent") @click.group(invoke_without_command=True) @click.option('-c', '--config', 'default_config_path', type=click.Path(), help="Shepherd config TOML file to be used as default config layer." " Overrides default './shepherd*.toml' search") @click.option('-l', '--local', 'local_operation', is_flag=True, help="Only use the local config layers (default and custom), and disable all" " Shepherd Control remote features") @click.option('-d', '--default-config-only', 'only_default_layer', is_flag=True, help="Ignore the custom config layer (still uses the Control config above that)") @click.pass_context def cli(ctx, default_config_path, local_operation, only_default_layer): """ Core service. If default config file is not provided with '-c' option, the first filename in the current working directory beginning with "shepherd" and ending with ".toml" will be used. """ version_text = pkg_resources.get_distribution("shepherd") log.info(F"Shepherd Agent {version_text}") if ctx.invoked_subcommand == "template": return if not default_config_path: default_config_path = sorted(glob.glob("./shepherd*.toml"))[:1] if default_config_path: default_config_path = default_config_path[0] log.info(F"No default config file provided, using {default_config_path}") else: log.error("No default config file provided, and no 'shepherd*.toml' could be" " found in the current directory") sys.exit(1) layers_disabled = [] if local_operation: layers_disabled.append("control") log.info("Running in local only mode") if only_default_layer: layers_disabled.append("custom") confman = ConfigManager() compile_config(confman, default_config_path, layers_disabled) plugin_configs = confman.get_config_bundles() del plugin_configs["shepherd"] core_config = confman.get_config_bundle("shepherd") # control.init_control(core_config, plugin_configs) scheduler.init_scheduler(core_config) plugin.init_plugins(plugin_configs, core_config) # scheduler.restore_jobs() print(str(datetime.now())) if ctx.invoked_subcommand is not None: return print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass @cli.command() def test(): print("test!") @cli.command() @click.argument('plugin_name', required=False) @click.option('-a', '--include-all', is_flag=True, help="Include all optional fields in the template") @click.option('-c', '--config', 'config_path', type=click.Path(), help="Path to append or create config tempalate") @click.option('-d', '--plugin-dir', type=click.Path(), help="Directory to search for plugin modules, in addition to built in Shepherd" " plugins and the global import path. Defaults to current directory.") @click.pass_context def template(ctx, plugin_name, include_all, config_path, plugin_dir): """ Generate a template config TOML file for PLUGIN, or for the Shepherd core if PLUGIN is not provided. If config path is provided ("-c"), append to that file (if it exists) or write to a new file (if it doesn't yet exist). """ if not plugin_dir: plugin_dir = Path.cwd() confspec = ConfigSpecification() if (not plugin_name) or (plugin_name == "shepherd"): plugin_name = "shepherd" confspec = core_confspec() else: try: plugin_interface = plugin.load_plugin(plugin_name, plugin_dir) except plugin.PluginLoadError as e: log.error(e.args[0]) sys.exit(1) confspec = plugin_interface.confspec template_dict = confspec.get_template(include_all) template_toml = toml.dumps({plugin_name: template_dict}) log.info(F"Config template for [{plugin_name}]: \n\n"+template_toml) if not config_path: # reuse parent "-c" for convenience config_path = ctx.parent.params["default_config_path"] if not config_path: return if Path(config_path).is_file(): try: existing_config = toml.load(config_path) except Exception: click.confirm( F"File {config_path} already exists and is not a valid TOML file. Overwrite?", default=True, abort=True) click.echo(F"Writing [{plugin_name}] template to {config_path}") with open(config_path, 'w+') as f: f.write(template_toml) else: if plugin_name in existing_config: click.confirm(F"Overwrite [{plugin_name}] section in {config_path}?", default=True, abort=True) click.echo(F"Overwriting [{plugin_name}] section in {config_path}") else: click.confirm(F"Add [{plugin_name}] section to {config_path}?", default=True, abort=True) click.echo(F"Adding [{plugin_name}] section to {config_path}") existing_config[plugin_name] = template_dict with open(config_path, 'w+') as f: f.write(toml.dumps(existing_config)) else: click.echo(F"Writing [{plugin_name}] template to {config_path}") with open(config_path, 'w+') as f: f.write(template_toml) def compile_config(confman, default_config_path, layers_disabled): """ Run through the process of assembling the various config layers, falling back to working ones where necessary. As part of this, loads in the required plugins. """ confman.add_confspec("shepherd", core_confspec()) # ====Default Local Config Layer==== # This must validate to continue. default_config_path = Path(default_config_path).expanduser() try: load_config_layer_and_plugins(confman, default_config_path) log.info(F"Loaded default config layer from {default_config_path}") except Exception as e: if isinstance(e, InvalidConfigError): log.error(F"Failed to load default config from {default_config_path}. {e.args[0]}") else: log.error(F"Failed to load default config from {default_config_path}", exc_info=True) sys.exit(1) # Resolve and freeze local install paths that shouldn't be changed from default config core_conf = confman.get_config_bundle("shepherd") resolve_core_conf_paths(core_conf) confman.freeze_value("shepherd", "root_dir") confman.freeze_value("shepherd", "plugin_dir") confman.freeze_value("shepherd", "custom_config_path") confman.freeze_value("shepherd", "generated_config_path") # Pull out custom config path and save current good config custom_config_path = core_conf["custom_config_path"] confman.save_fallback() # ====Custom Local Config Layer==== # If this fails, maintain default config but continue on to Control layer if "custom" not in layers_disabled: try: load_config_layer_and_plugins(confman, custom_config_path) log.info(F"Loaded custom config layer from {custom_config_path}") except Exception as e: if isinstance(e, InvalidConfigError): log.error(F"Failed to load custom config layer from {custom_config_path}." F" {e.args[0]}") else: log.error(F"Failed to load custom config layer from {custom_config_path}.", exc_info=True) log.warning("Falling back to default config.") confman.fallback() else: log.info("Custom config layer disabled") # Freeze Shepherd Control related config. core_conf = confman.get_config_bundle("shepherd") resolve_core_conf_paths(core_conf) confman.freeze_value("shepherd", "control_server") confman.freeze_value("shepherd", "control_api_key") # Save current good config confman.save_fallback() # ====Control Remote Config Layer==== # If this fails, maintain current local config. if "control" not in layers_disabled: try: control_config = control.get_config(core_conf["root_dir"]) try: load_config_layer_and_plugins(confman, control_config) log.info(F"Loaded cached Shepherd Control config layer") except Exception as e: if isinstance(e, InvalidConfigError): log.error(F"Failed to load cached Shepherd Control config layer." F" {e.args[0]}") else: log.error(F"Failed to load cached Shepherd Control config layer.", exc_info=True) log.warning("Falling back to local config.") confman.fallback() except Exception: log.warning("No cached Shepherd Control config layer available.") else: log.info("Shepherd Control config layer disabled") log.debug("Compiled config: %s", confman.root_config) confman.dump_to_file(core_conf["generated_config_path"]) # Relative pathnames here are all relative to "root_dir" def core_confspec(): """ Returns the core config specification """ confspec = ConfigSpecification() confspec.add_specs([ ("name", StringSpec(helptext="Identifying name for this device")), ("hostname", StringSpec(default="", optional=True, helptext="If set, changes the system" " hostname")), ("plugin_dir", StringSpec(default="~/shepherd-plugins", optional=True, helptext="Optional directory for Shepherd to look for" " plugins in.")), ("root_dir", StringSpec(default="~/shepherd", optional=True, helptext="Operating directory for shepherd to place" " working files.")), ("custom_config_path", StringSpec(optional=True, helptext="Path to custom config" " layer TOML file.")), ("generated_config_path", StringSpec(default="shepherd-generated.toml", optional=True, helptext="Path to custom file Shepherd will generate" " to show compiled config that was used and any" " errors in validation.")) ]) confspec.add_spec("control_server", StringSpec()) confspec.add_spec("control_api_key", StringSpec()) return confspec def resolve_core_conf_paths(core_conf): """ Set the cwd to ``root_dir`` and resolve other core config paths relative to that. Also expands out any "~" user characters. """ core_conf["root_dir"] = str(Path(core_conf["root_dir"]).expanduser().resolve()) os.chdir(core_conf["root_dir"]) core_conf["plugin_dir"] = str(Path(core_conf["plugin_dir"]).expanduser().resolve()) core_conf["custom_config_path"] = str( Path(core_conf["custom_config_path"]).expanduser().resolve()) core_conf["generated_config_path"] = str( Path(core_conf["generated_config_path"]).expanduser().resolve()) def load_config_layer_and_plugins(confman: ConfigManager, config_source): """ Load a config layer, find the necessary plugin classes, then validate it. If this succeeds, the returned dict of plugin classes will directly match the bundle names in the config manager. """ # Load in config layer confman.load_overlay(config_source) # Get the core config so we can find the plugin directory core_config = confman.get_config_bundle("shepherd") plugin_dir = core_config["plugin_dir"] # List other bundle names to get plugins we need to load plugin_names = confman.get_bundle_names() plugin_names.remove("shepherd") # Load plugins to get their config specifications plugin_interfaces = plugin.load_plugins(plugin_names, plugin_dir) for plugin_name, plugin_interface in plugin_interfaces.items(): confman.add_confspec(plugin_name, plugin_interface.confspec) # Validate all plugin configs confman.validate_bundles()