You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/agent/cli.py

256 lines
9.2 KiB

import logging
import os
import sys
import glob
from types import SimpleNamespace
from datetime import datetime
from pprint import pprint
import pkg_resources
import chromalog
import click
from . import core
chromalog.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))
log = logging.getLogger("shepherd.agent")
def echo_heading(title, on_nl=True):
if on_nl:
click.echo("")
click.echo(click.style(".: ", fg='blue', bold=True) +
click.style(title, fg='white', bold=True) +
click.style(" :.", fg='blue', bold=True))
def echo_section(title, input_text=None, on_nl=True):
if on_nl:
click.echo("")
click.secho(":: ", bold=True, fg='blue', nl=False)
click.secho(title, bold=True, nl=False)
if input_text:
click.secho(F" {input_text}", fg='green', bold=True, nl=False)
click.echo("")
@click.group(invoke_without_command=True)
@click.option('-c', '--config', 'default_config_path', type=click.Path(),
help="Shepherd config TOML file to be used as default config layer."
" Overrides default './shepherd*.toml' search")
@click.option('-l', '--local', 'local_operation', is_flag=True,
help="Only use the local config layers (default and custom), and disable all"
" Shepherd Control remote features")
@click.option('-d', '--default-config-only', 'only_default_layer', is_flag=True,
help="Ignore the custom config layer (still uses the Control config above that)")
@click.option('-n', '--new', 'new_run', is_flag=True,
help="Clear existing device identity and cached Shepherd Control config layer."
" Also triggered by the presence of a shepherd.new file in the"
" same directory as the custom config layer file.")
@click.pass_context
def cli(ctx, default_config_path, local_operation, only_default_layer, new_run):
"""
Core service. If default config file is not provided with '-c' option, the first filename
in the current working directory beginning with "shepherd" and
ending with ".toml" will be used.
"""
ctx.ensure_object(SimpleNamespace)
version_text = pkg_resources.get_distribution("shepherd")
log.info(F"Shepherd Agent [{version_text}]")
# Drop down to subcommand if it doesn't need default config file processing
if ctx.invoked_subcommand == "template":
return
# Get a default config path to use
if not default_config_path:
default_config_path = sorted(glob.glob("./shepherd*.toml"))[:1]
if default_config_path:
default_config_path = default_config_path[0]
log.info(F"No default config file provided, found {default_config_path}")
with open(default_config_path, 'r+') as f:
content = f.read()
if "Compiled Shepherd config" in content:
log.warning("Default config file looks like it is full compiled config"
" file generated by Shepherd and picked up due to accidental"
" name match")
else:
log.error("No default config file provided, and no 'shepherd*.toml' could be"
" found in the current directory")
sys.exit(1)
# Establish what config layers we're going to try and use
layers_disabled = []
if local_operation or (ctx.invoked_subcommand == "test"):
layers_disabled.append("control")
log.info("Running in local only mode")
if only_default_layer:
layers_disabled.append("custom")
agent = core.Agent(control_enabled)
agent.load(default_config_path, use_custom_config, new_device_mode)
# Drop down to subcommands that needed a config compiled
if ctx.invoked_subcommand == "test":
ctx.obj.agent = agent
return
agent.start()
print(str(datetime.now()))
if ctx.invoked_subcommand is not None:
return
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
@cli.command()
@click.argument('plugin_name', required=False)
@click.argument('interface_function', required=False)
@click.pass_context
def test(ctx, plugin_name, interface_function):
echo_heading("Shepherd Test")
if not plugin_name:
echo_section("Plugins loaded:")
if len(ctx.obj.plugin_configs) == 0:
click.echo("---none---")
for plugin_name, config in ctx.obj.plugin_configs.items():
click.secho(F" {plugin_name}", fg='green')
echo_section("Core config:")
pprint(ctx.obj.core_config)
echo_section("Plugin configs:")
if len(ctx.obj.plugin_configs) == 0:
click.echo("---none---")
for name, config in ctx.obj.plugin_configs.items():
click.secho(F" {plugin_name}", fg='green')
pprint(config)
click.echo("")
log.info("Initialising plugins...")
plugin.init_plugins(ctx.obj.plugin_configs, ctx.obj.core_config)
log.info("Plugin initialisation done")
return
echo_section("Target plugin:", input_text=plugin_name, on_nl=False)
# TODO find plugin dependancies
if plugin_name not in ctx.obj.plugin_configs:
log.error(F"Supplied plugin name '{plugin_name}' is not loaded"
" (not present in config)")
sys.exit(1)
echo_section(F"Config [{plugin_name}]:")
pprint(ctx.obj.plugin_configs[plugin_name])
interface = plugin.load_plugin(plugin_name)
if not interface_function:
echo_section(F"Interface functions [{plugin_name}]:")
for name, func in interface._functions.items():
click.echo(F" {name}")
return
echo_section("Target interface function:", input_text=interface_function)
if interface_function not in interface._functions:
log.error(F"Supplied interface function name '{interface_function}' is not present in"
F" plugin {plugin_name}")
sys.exit(1)
log.info("Initialising plugins...")
plugin.init_plugins({plugin_name: ctx.obj.plugin_configs[plugin_name]}, ctx.obj.core_config)
log.info("Plugin initialisation done")
interface._functions[interface_function]()
@cli.command()
@click.argument('plugin_name', required=False)
@click.option('-a', '--include-all', is_flag=True,
help="Include all optional fields in the template")
@click.option('-c', '--config', 'config_path', type=click.Path(),
help="Path to append or create config tempalate")
@click.option('-d', '--plugin-dir', type=click.Path(),
help="Directory to search for plugin modules, in addition to built in Shepherd"
" plugins and the global import path. Defaults to current directory.")
@click.pass_context
def template(ctx, plugin_name, include_all, config_path, plugin_dir):
"""
Generate a template config TOML file for PLUGIN_NAME, or for the Shepherd core if
PLUGIN_NAME is not provided.
If config path is provided ("-c"), append to that file (if it exists) or write to
a new file (if it doesn't yet exist).
"""
if not plugin_dir:
plugin_dir = Path.cwd()
confspec = ConfigSpecification()
if (not plugin_name) or (plugin_name == "shepherd"):
plugin_name = "shepherd"
confspec = core_confspec()
else:
try:
plugin_interface = plugin.load_plugin(plugin_name, plugin_dir)
except plugin.PluginLoadError as e:
log.error(e.args[0])
sys.exit(1)
confspec = plugin_interface.confspec
template_dict = confspec.get_template(include_all)
template_toml = toml.dumps({plugin_name: template_dict})
log.info(F"Config template for [{plugin_name}]: \n\n"+template_toml)
if not config_path:
# reuse parent "-c" for convenience
config_path = ctx.parent.params["default_config_path"]
if not config_path:
return
if Path(config_path).is_file():
try:
existing_config = toml.load(config_path)
except Exception:
click.confirm(
F"File {config_path} already exists and is not a valid TOML file. Overwrite?",
default=True, abort=True)
click.echo(F"Writing [{plugin_name}] template to {config_path}")
with open(config_path, 'w+') as f:
f.write(template_toml)
else:
if plugin_name in existing_config:
click.confirm(F"Overwrite [{plugin_name}] section in {config_path}?",
default=True, abort=True)
click.echo(F"Overwriting [{plugin_name}] section in {config_path}")
else:
click.confirm(F"Add [{plugin_name}] section to {config_path}?",
default=True, abort=True)
click.echo(F"Adding [{plugin_name}] section to {config_path}")
existing_config[plugin_name] = template_dict
with open(config_path, 'w+') as f:
f.write(toml.dumps(existing_config))
else:
click.echo(F"Writing [{plugin_name}] template to {config_path}")
with open(config_path, 'w+') as f:
f.write(template_toml)