import logging import os import sys import glob from types import SimpleNamespace from datetime import datetime from pprint import pprint import pkg_resources import chromalog import click from . import core chromalog.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) log = logging.getLogger("shepherd.agent") def echo_heading(title, on_nl=True): if on_nl: click.echo("") click.echo(click.style(".: ", fg='blue', bold=True) + click.style(title, fg='white', bold=True) + click.style(" :.", fg='blue', bold=True)) def echo_section(title, input_text=None, on_nl=True): if on_nl: click.echo("") click.secho(":: ", bold=True, fg='blue', nl=False) click.secho(title, bold=True, nl=False) if input_text: click.secho(F" {input_text}", fg='green', bold=True, nl=False) click.echo("") @click.group(invoke_without_command=True) @click.option('-c', '--config', 'default_config_path', type=click.Path(), help="Shepherd config TOML file to be used as default config layer." " Overrides default './shepherd*.toml' search") @click.option('-l', '--local', 'local_operation', is_flag=True, help="Only use the local config layers (default and custom), and disable all" " Shepherd Control remote features") @click.option('-d', '--default-config-only', 'only_default_layer', is_flag=True, help="Ignore the custom config layer (still uses the Control config above that)") @click.option('-n', '--new', 'new_run', is_flag=True, help="Clear existing device identity and cached Shepherd Control config layer." " Also triggered by the presence of a shepherd.new file in the" " same directory as the custom config layer file.") @click.pass_context def cli(ctx, default_config_path, local_operation, only_default_layer, new_run): """ Core service. If default config file is not provided with '-c' option, the first filename in the current working directory beginning with "shepherd" and ending with ".toml" will be used. """ ctx.ensure_object(SimpleNamespace) version_text = pkg_resources.get_distribution("shepherd") log.info(F"Shepherd Agent [{version_text}]") # Drop down to subcommand if it doesn't need default config file processing if ctx.invoked_subcommand == "template": return # Get a default config path to use if not default_config_path: default_config_path = sorted(glob.glob("./shepherd*.toml"))[:1] if default_config_path: default_config_path = default_config_path[0] log.info(F"No default config file provided, found {default_config_path}") with open(default_config_path, 'r+') as f: content = f.read() if "Compiled Shepherd config" in content: log.warning("Default config file looks like it is full compiled config" " file generated by Shepherd and picked up due to accidental" " name match") else: log.error("No default config file provided, and no 'shepherd*.toml' could be" " found in the current directory") sys.exit(1) # Establish what config layers we're going to try and use layers_disabled = [] if local_operation or (ctx.invoked_subcommand == "test"): layers_disabled.append("control") log.info("Running in local only mode") if only_default_layer: layers_disabled.append("custom") agent = core.Agent(control_enabled) agent.load(default_config_path, use_custom_config, new_device_mode) # Drop down to subcommands that needed a config compiled if ctx.invoked_subcommand == "test": ctx.obj.agent = agent return agent.start() print(str(datetime.now())) if ctx.invoked_subcommand is not None: return print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass @cli.command() @click.argument('plugin_name', required=False) @click.argument('interface_function', required=False) @click.pass_context def test(ctx, plugin_name, interface_function): echo_heading("Shepherd Test") if not plugin_name: echo_section("Plugins loaded:") if len(ctx.obj.plugin_configs) == 0: click.echo("---none---") for plugin_name, config in ctx.obj.plugin_configs.items(): click.secho(F" {plugin_name}", fg='green') echo_section("Core config:") pprint(ctx.obj.core_config) echo_section("Plugin configs:") if len(ctx.obj.plugin_configs) == 0: click.echo("---none---") for name, config in ctx.obj.plugin_configs.items(): click.secho(F" {plugin_name}", fg='green') pprint(config) click.echo("") log.info("Initialising plugins...") plugin.init_plugins(ctx.obj.plugin_configs, ctx.obj.core_config) log.info("Plugin initialisation done") return echo_section("Target plugin:", input_text=plugin_name, on_nl=False) # TODO find plugin dependancies if plugin_name not in ctx.obj.plugin_configs: log.error(F"Supplied plugin name '{plugin_name}' is not loaded" " (not present in config)") sys.exit(1) echo_section(F"Config [{plugin_name}]:") pprint(ctx.obj.plugin_configs[plugin_name]) interface = plugin.load_plugin(plugin_name) if not interface_function: echo_section(F"Interface functions [{plugin_name}]:") for name, func in interface._functions.items(): click.echo(F" {name}") return echo_section("Target interface function:", input_text=interface_function) if interface_function not in interface._functions: log.error(F"Supplied interface function name '{interface_function}' is not present in" F" plugin {plugin_name}") sys.exit(1) log.info("Initialising plugins...") plugin.init_plugins({plugin_name: ctx.obj.plugin_configs[plugin_name]}, ctx.obj.core_config) log.info("Plugin initialisation done") interface._functions[interface_function]() @cli.command() @click.argument('plugin_name', required=False) @click.option('-a', '--include-all', is_flag=True, help="Include all optional fields in the template") @click.option('-c', '--config', 'config_path', type=click.Path(), help="Path to append or create config tempalate") @click.option('-d', '--plugin-dir', type=click.Path(), help="Directory to search for plugin modules, in addition to built in Shepherd" " plugins and the global import path. Defaults to current directory.") @click.pass_context def template(ctx, plugin_name, include_all, config_path, plugin_dir): """ Generate a template config TOML file for PLUGIN_NAME, or for the Shepherd core if PLUGIN_NAME is not provided. If config path is provided ("-c"), append to that file (if it exists) or write to a new file (if it doesn't yet exist). """ if not plugin_dir: plugin_dir = Path.cwd() confspec = ConfigSpecification() if (not plugin_name) or (plugin_name == "shepherd"): plugin_name = "shepherd" confspec = core_confspec() else: try: plugin_interface = plugin.load_plugin(plugin_name, plugin_dir) except plugin.PluginLoadError as e: log.error(e.args[0]) sys.exit(1) confspec = plugin_interface.confspec template_dict = confspec.get_template(include_all) template_toml = toml.dumps({plugin_name: template_dict}) log.info(F"Config template for [{plugin_name}]: \n\n"+template_toml) if not config_path: # reuse parent "-c" for convenience config_path = ctx.parent.params["default_config_path"] if not config_path: return if Path(config_path).is_file(): try: existing_config = toml.load(config_path) except Exception: click.confirm( F"File {config_path} already exists and is not a valid TOML file. Overwrite?", default=True, abort=True) click.echo(F"Writing [{plugin_name}] template to {config_path}") with open(config_path, 'w+') as f: f.write(template_toml) else: if plugin_name in existing_config: click.confirm(F"Overwrite [{plugin_name}] section in {config_path}?", default=True, abort=True) click.echo(F"Overwriting [{plugin_name}] section in {config_path}") else: click.confirm(F"Add [{plugin_name}] section to {config_path}?", default=True, abort=True) click.echo(F"Adding [{plugin_name}] section to {config_path}") existing_config[plugin_name] = template_dict with open(config_path, 'w+') as f: f.write(toml.dumps(existing_config)) else: click.echo(F"Writing [{plugin_name}] template to {config_path}") with open(config_path, 'w+') as f: f.write(template_toml)