import importlib from pathlib import Path import inspect import logging import sys import pkgutil from collections import namedtuple from collections.abc import Sequence from functools import partial from types import MappingProxyType import pkg_resources from configspec import ConfigSpecification from configspec.specification import _ValueSpecification import configspec import preserve from .util import NamespaceProxy from . import control from . import tasks from .. import base_plugins # Note that while module attributes intended for external use are mixed here, all the external # ones are pulled into the root package scope and are intended to be accessed that way. log = logging.getLogger(__name__) # Cache of loaded plugins so far _loaded_plugins = {} class UnboundMethod(): """ Simple wrapper to mark that this is a reference to a method hasn't been bound to an instance yet, (or had a decorator like ''staticmethod'' or ''classmethod'' unwrapped). Sets its signature to the result of binding to an anonymous object. """ def __init__(self, func): self._func = func self._bound_func = None sigobj = self._func.__get__(object()) self.__signature__ = inspect.signature(sigobj) self.__doc__ = inspect.getdoc(self._func) self.__name__ = sigobj.__name__ def bind(self, obj): """ Bind the wrapped method to an object, and return the result. Once bound, calling the UnboundMethod will actually call the bound result, and the ''func()'' property will return it. """ self._bound_func = self._func.__get__(obj) return self._bound_func @property def func(self): if self._bound_func is None: raise Exception("Cannot get func from UnboundMethod until it has been bound.") return self._bound_func def __call__(self, *args, **kwargs): if self._bound_func is None: raise Exception("Cannot call UnboundMethod until it has been bound.") return self._bound_func(*args, **kwargs) class PluginLoadError(Exception): pass def is_instance_check(classtype): def instance_checker(obj, classtype=classtype): return isinstance(obj, classtype) return instance_checker ClassMarker = namedtuple("ClassMarker", []) def plugin_class(cls): """ Class decorator, used to indicate that a class is to be used as the Plugin Class for this plugin. Note that only one plugin class is allowed per plugin. Only works when placed in the root of the plugin module or package (same as the interface) Use on the class definition: @plugin_class class MyPluginClass: This is equivalent to registering the class directly with the plugin interface later: interface.register_plugin_class(MyPluginClass) """ if not inspect.isclass(cls): raise PluginLoadError(F"@plugin_class can only be used to decorate a class") cls._shepherd_load_marker = ClassMarker() return cls FunctionMarker = namedtuple("FunctionMarker", ["name"]) def plugin_function(func=None, *, name=None): """ Method decorator to register a method as a plugin interface function. If `name` is not supplied, the name of the decorated function is used. Either used directly: @plugin_function def my_method(self): or with optional keyword arguments: @plugin_function(name="someOtherName") def my_badly_named_method(self): Can either be used on functions in the root level of the plugin module, or on methods within the registered Plugin Class (either with @plugin_class or interface.register_plugin_class() ) """ if func is None: return partial(plugin_function, name=name) func._shepherd_load_marker = FunctionMarker(name) return func HookMarker = namedtuple("HookMarker", ["name", "signature"]) def plugin_hook(func=None, *, name=None): """ Method decorator to register a hook for the plugin. Will use the decorated function signature for the hook, and replace the decorated function with the new hook on plugin init, so it can be called directly. If `name` is not supplied, the name of the decorated function is used. Like `plugin_function`, can be either used directly: @plugin_hook def my_method(self): or with the optional keyword argument: @plugin_hook(name="someOtherName") def my_badly_named_method(self): As the decorated function is only being used as a convenient way to declare the hook signature and to call the hook later, the usual Python `self` method binding system isn't appropriate. Methods in the plugin class _can_ be registered as hooks (as can module root level functions), but the signature will be used directly (don't add the `self` argument). For technical clarity, methods in the class should be decorated with `@staticmethod` below the `@plugin_hook` decorator. """ if func is None: return partial(plugin_hook, name=name) if isinstance(func, staticmethod): # Pull the underlying function out of a staticmethod. It's static, so bound object is # irrelevant func = func.__get__(object()) if not name: name = func.__name__ func._shepherd_load_marker = HookMarker(name, inspect.signature(func)) return func AttachmentMarker = namedtuple("AttachmentMarker", ["hook_identifier"]) def plugin_attachment(hook_identifier): """ Function decorator to register a function or method as an attachment to a plugin hook. The `hook_identifier` is a string indicating what hook to attach to. It can either refer to a hook in _another_ plugin with the form "my_plugin.my_hook", or to a local hook in the same plugin with just the hook name: "my_hook". Can either be used on functions in the root level of the plugin module, or on methods within the registered Plugin Class (either with @plugin_class or interface.register_plugin_class() ) """ def attachment_decorator(func): func._shepherd_load_marker = AttachmentMarker(hook_identifier) return func return attachment_decorator @preserve.preservable(exclude_attrs=('function')) class InterfaceCall(): def __init__(self, plugin_name, function_name, kwargs=None): """ Record an interface function call for future use. """ self.plugin_name = plugin_name self.function_name = function_name self.function = None self.kwargs = kwargs if self.kwargs is None: self.kwargs = {} def __restore_init__(self): self.function = None def resolve(self, interface_functions): """ Resolve the InterfaceFunction this call refers to. Requires a dict of plugin functions, where the keys are plugin names and the values are NamedTuples containing the interface functions for that plugin. """ if self.plugin_name not in interface_functions: raise ValueError(F"Plugin '{self.plugin_name}' could not be found to resolve function") if not hasattr(interface_functions[self.plugin_name], self.function_name): raise ValueError(F"Interface function '{self.function_name}' could not be found in" F" plugin '{self.plugin_name}'") self.function = getattr(interface_functions[self.plugin_name], self.function_name) def call(self): """ Make the interface function call this object refers to (using the stored kwargs). Must make sure `resolve()` is called first to actually find the function. """ return self.function(**self.kwargs) class InterfaceFunction(): def __init__(self, func, name=None, remote_command=False): """ Wrapper around a callable to define a plugin interface function. """ self.func = func self.remote = remote_command self.spec = None if not callable(self.func): raise TypeError("InterfaceFunction can only be created around a callable or method.") sig = inspect.signature(self.func) if self.remote: self.spec = ConfigSpecification() for param in sig.parameters: if param.kind not in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY): raise ValueError("Interface functions must be callable with keyword arguments") arg_spec = param.annotation if arg_spec in ("str", str): arg_spec = configspec.StringSpec() if arg_spec in ("int", int): arg_spec = configspec.IntSpec() if not isinstance(arg_spec, _ValueSpecification): raise ValueError("Function annotations for a Shepherd Interface function" "must be a type of ConfigSpecification, or on the the valid" "type shortcuts") self.spec.add_spec(param.name, arg_spec) if sig.return_annotation is not inspect.Signature.empty: ret_spec = sig.return_annotation if ret_spec in ("str", str): ret_spec = configspec.StringSpec() if ret_spec in ("int", int): ret_spec = configspec.IntSpec() if not isinstance(ret_spec, _ValueSpecification): raise ValueError("Function annotations for a Shepherd Interface function" "must be a type of ConfigSpecification, or on the the valid" "type shortcuts") self.spec.add_spec("return", arg_spec) func_doc = inspect.getdoc(self.func) if func_doc: self.spec.helptext = func_doc if name: self.name = name else: self.name = self.func.__name__ log.debug(F"Loaded interface function {self.name} with parameters: {sig.parameters}") def get_spec(self): """ Get the function spec used for Shepherd Control to know how to call it as a command. Will return None unless `remote_command` was marked True on creation. Returns a ConfigSpecification. If a return value spec is present, it uses the reserved spec name "return". Any docstring on the function is placed in the root ConfigSpecification helptext. """ return self.spec def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) class HookAttachment(): """ Simple record to store the details of a deferred hook attachment. Only a class to allow the func attribute to be changed. """ def __init__(self, func, plugin_name, hook_name): self.func = func self.plugin_name = plugin_name self.hook_name = hook_name class PluginHook(): """ A hook to call a set of attachements provided by plugins. Calling the hook directly will call all attached functions, returning the results in a dict where the keys are the name of the plugin the attachment came from (if there are no attachements, the result will be an empty dict). """ def __init__(self, name, signature): self.name = name self.signature = signature # dict of callables, plugin names as keys self._attached_functions = {} self.attachments = MappingProxyType(self._attached_functions) def _attach(self, new_func, plugin_name): if not callable(new_func): raise TypeError("Hook attachment must be callable") if plugin_name in self._attached_functions: raise Exception(F"Hook already has attachment from plugin '{plugin_name}'") new_sig = inspect.signature(new_func) if str(new_sig) != str(self.signature): raise Exception(F"Hook attachment signature '{new_sig}' must match the signature " F"'{str(self.signature)}' for target hook {self.name}") self._attached_functions[plugin_name] = new_func def __call__(self, *args, **kwargs): results = {} for plugin_name, func in self._attached_functions.items(): results[plugin_name] = func(*args, **kwargs) return results class PluginInterface(): def __init__(self): self._confspec = None self._loaded = False self._initialised = False self._functions = {} self._hooks = {} self._attachments = [] self._tasks = [] self._plugin_class = None self.config = None self.plugins = None self.hooks = None self._plugin_name = "" self._update_state = control.PluginUpdateState() def _load_pluginclass(self, module): pass def _load_guard(self): if self._loaded: raise PluginLoadError("Cannot call interface register functions once" " plugin is loaded") def register_confspec(self, confspec): self._load_guard() if self._confspec is not None: raise PluginLoadError("Plugin can only register one ConfigSpecification") if not isinstance(confspec, ConfigSpecification): raise PluginLoadError("confspec must be an instance of ConfigSpecification") self._confspec = confspec def register_class(self, cls): self._load_guard() if self._plugin_class is not None: raise PluginLoadError("Plugin can only register one plugin class") if not inspect.isclass(cls): raise PluginLoadError("plugin_class must be a class") self._plugin_class = cls def register_function(self, func, name=None, remote_command=False): """ Register a function or method as an interface function for the plugin. If name is not provided, the name of the callable will be used. """ self._load_guard() ifunc = InterfaceFunction(func, name, remote_command) if ifunc.name in self._functions: raise PluginLoadError(F"Interface function with name '{ifunc.name}' already exists") self._functions[ifunc.name] = ifunc def register_attachment(self, func, hook_identifier): """ Register a function or method as an attachment to a plugin hook. The `hook_identifier` is a string indicating what hook to attach to. It can either refer to a hook in _another_ plugin with the form "my_plugin.my_hook", or to a local hook in the same plugin with just the hook name: "my_hook". """ self._load_guard() if not callable(func): raise TypeError("Hook attachment can only be created around a callable or method.") extra, plugin_name, hook_name = ([None, None]+hook_identifier.split('.'))[-3:] if extra is not None: raise ValueError("Hook identifier can contain at most 2 parts around a '.' character") self._attachments.append(HookAttachment(func, plugin_name, hook_name)) def register_hook(self, name, signature=None): """ Register a plugin hook for other plugins to attach to. Can only be registered during plugin load. Hook will be accessible during init from `PluginInterface.hooks.`. Optionally, the same hook object is also returned from `register_hook` to allow it to be stored and called elsewhere. In most cases, the decorator form (`@plugin_hook`) is more convenient to use, as it will directly replace the decorated function or method (usually with just `pass` as the content) with the hook - allowing it to be called as normal. If the hook requires arguments, either use the decorator form or pass in the `signature` argument. For basic keyword args, this can just be a sequence of string argument names, for example: `register_hook("my_hook", ["arg_a", "arg_b"])` If a more complex signature is required, a `inspect.Signature` object can be passed in. This is most easily expressed inline with a lambda (only to use standard Python function argument definitions - the lambda doesn't get called). For example: `register_hook("my_hook", inspect.signature(lambda arg_a, arg_b_with_default=5: None))` Args: name: A string (and valid Python identifier) naming the hook. Must be unique within the plugin. signature: If None, registers the hook as requiring no arguments. Can either be a list of argument names, or an `inspect.Signature` object. """ self._load_guard() if not isinstance(name, str): raise PluginLoadError(F"Hook name must be a string") if name in self._hooks: raise PluginLoadError(F"Hook with name '{name}' already exists") if signature is None: signature = inspect.Signature([]) elif isinstance(signature, Sequence): params = [] for param_name in signature: params.append(inspect.Parameter( param_name, inspect.Parameter.POSITIONAL_OR_KEYWORD)) signature = inspect.Signature(params) if not isinstance(signature, inspect.Signature): raise PluginLoadError("Hook signature must either be a sequence of paremeter names or" " and instance of inspect.Signature") self._hooks[name] = PluginHook(name, signature) return self._hooks[name] def add_task(self, trigger, interface_function, kwargs=None): """ Add a task when creating a new session. Can only be called during init (object or hook). Will be ignored if Shepherd is resuming an old session. Args: trigger: The trigger object (either a CronTrigger or the result of a ConfigTriggerSpec) interface_function: The interface function on this plugin to call when triggered. Can either be a callable that was registered as a plugin function or a string matching the function name. kwargs: Any keyword arguments to be passed to the function when the task is triggered. Must be Preservable. """ if not self._loaded: raise Exception("Cannot add plugin tasks until plugin is loaded") if self._initialised: raise Exception("Cannot add plugin tasks after plugin has been initialised") if isinstance(interface_function, str): if interface_function not in self._functions: raise Exception("Plugin does not have interface function" F" named {interface_function}") task_call = InterfaceCall(self._plugin_name, interface_function, kwargs) else: # Find the callable in our interface functions func_name = None for name, ifunc in self._functions.items(): if ifunc.func == interface_function: func_name = name break if func_name is None: raise Exception(F"Function {interface_function} has not been registered" " with the plugin") task_call = InterfaceCall(self._plugin_name, func_name, kwargs) self._tasks.append(tasks.Task(trigger, task_call)) def set_status(self, status): """ Set the plugin status, to be sent to Shepherd Control if configured. Args: status: A flat dictionary of fields with string keys. """ self._update_state.set_status(status) @property def confspec(self): return self._confspec def discover_base_plugins(): """ Returns a list of base plugin names available to load. These are plugins included with shepherd-agent, in 'base_plugins'. """ return [name for _, name, _ in pkgutil.iter_modules(base_plugins.__path__)] def discover_custom_plugins(plugin_dir=None): """ Returns a list of custom plugin names available to load. This includes all modules or packages within the supplied custom plugin directory. """ if plugin_dir: if Path(plugin_dir).is_dir(): return [name for _, name, _ in pkgutil.iter_modules([plugin_dir])] else: log.warning(F"Custom plugin directory {plugin_dir} does not exist") return [] def discover_installed_plugins(): """ Returns a list of installed plugin names available to load. These are packages that have used the 'shephed.plugin' entrypoint in their setup.py """ return [entrypoint.name for entrypoint in pkg_resources.iter_entry_points('shepherd.plugins')] def load_plugin(plugin_name, plugin_dir=None): """ Finds a Shepherd plugin, loads it, and returns the resulting PluginInterface object. Will check 3 sources, in order: 1. Built-in plugin modules/subpackages within ''shepherd.base_plugins''. Plugin module/package names match the plugin name. 2. Modules/packages within the supplied ''plugin_dir'' path. Plugin module/package names match the plugin name. 3. Any installed packages supplying the ''shepherd.plugin'' entrypoint. Once a module is found, loading it involves scanning the root of the module for a PluginInterface instance. If a confspec isn't registered, a ConfigSpecification instance at the module root will also be implicitly registered to the interface. Lastly, any plugin decorators are scanned for and registered (including a plugin class if present). Args: plugin_name: Name used to try and locate the plugin plugin_dir: Optional directory path to be used for custom plugins Returns: The PluginInterface for the loaded plugin """ if plugin_name in _loaded_plugins: return _loaded_plugins[plugin_name] # Each of the 3 plugin sources have different import mechanisms. Discovery is broken out to # allow them to be listed. Using a try/except block wouldn't be able to tell the difference # between a plugin not being found or //it's// imports not loading correctly. module = None if plugin_name in discover_base_plugins(): module = importlib.import_module(base_plugins.__name__+'.'+plugin_name) log.info(F"Loading base plugin {plugin_name}") elif plugin_name in discover_custom_plugins(plugin_dir): saved_syspath = sys.path try: sys.path = [str(plugin_dir)] module = importlib.import_module(plugin_name) finally: sys.path = saved_syspath modulepath = getattr(module, "__path__", [module.__file__])[0] log.info(F"Loading custom plugin {plugin_name} from {modulepath}") elif plugin_name in discover_installed_plugins(): module = pkg_resources.iter_entry_points('shepherd.plugins', plugin_name)[0].load() log.info(F"Loading installed plugin {plugin_name} from {module.__name__}") if not module: raise PluginLoadError("Could not find plugin "+plugin_name) # Now we have the module, scan it for the two implicit objects we look for - the interface and # the confspec interface_list = inspect.getmembers(module, is_instance_check(PluginInterface)) if not interface_list: raise PluginLoadError("Imported shepherd plugins must contain an instance" " of PluginInterface") if len(interface_list) > 1: log.warning(F"Plugin module {module.__name__} has more" F" than one PluginInterface instance.") _, interface = interface_list[0] interface._plugin_name = plugin_name # Only looks for implicit confspec if one isn't registered. Uses a blank one if none are # supplied. if interface._confspec is None: confspec_list = inspect.getmembers(module, is_instance_check(ConfigSpecification)) if confspec_list: if len(confspec_list) > 1: log.warning(F"Plugin {interface._plugin_name} has more" F" than one root ConfigSpecification instance.") interface.register_confspec(confspec_list[0][1]) else: interface._confspec = ConfigSpecification() interface._update_state.set_confspec(interface.confspec) # Scan module for load markers left by decorators and pass them over to register methods for key, attr in module.__dict__.items(): if hasattr(attr, "_shepherd_load_marker"): if isinstance(attr._shepherd_load_marker, FunctionMarker): interface.register_function(attr, **attr._shepherd_load_marker._asdict()) elif isinstance(attr._shepherd_load_marker, AttachmentMarker): interface.register_attachment(attr, **attr._shepherd_load_marker._asdict()) elif isinstance(attr._shepherd_load_marker, HookMarker): # Hooks are a little different in that we replace the attr with the hook newhook = interface.register_hook(**attr._shepherd_load_marker._asdict()) setattr(module, key, newhook) elif isinstance(attr._shepherd_load_marker, ClassMarker): interface.register_class(attr) if interface._plugin_class is not None: # Scan plugin class for marked methods for key, attr in interface._plugin_class.__dict__.items(): if hasattr(attr, "_shepherd_load_marker"): if isinstance(attr._shepherd_load_marker, FunctionMarker): # Instance doesn't exist yet, so need to save unbound methods for binding later interface.register_function(UnboundMethod(attr), **attr._shepherd_load_marker._asdict()) elif isinstance(attr._shepherd_load_marker, AttachmentMarker): interface.register_attachment(UnboundMethod(attr), **attr._shepherd_load_marker._asdict()) elif isinstance(attr._shepherd_load_marker, HookMarker): # Hooks are a little different in that we replace the attr with the hook newhook = interface.register_hook(**attr._shepherd_load_marker._asdict()) setattr(interface._plugin_class, key, newhook) # Assemble remote interface function specs command_spec = {} for function in interface._functions.values(): if function.remote: command_spec[function.name] = function.get_spec() interface._update_state.set_commandspec(command_spec) interface._loaded = True _loaded_plugins[plugin_name] = interface return interface def init_plugins(plugin_configs, core_config, core_interface_functions): """ Initialise plugins named as keys in plugin_configs. Plugins must already be loaded. Returns dict of initialised plugin interfaces, and a dict of interface function namedtuples (one for each plugin) """ # Pick out plugins to init plugin_interfaces = {} for plugin_name in plugin_configs.keys(): plugin_interfaces[plugin_name] = _loaded_plugins[plugin_name] interface_functions = {} interface_functions_proxy = MappingProxyType(interface_functions) # Run plugin init and init hooks for plugin_name, interface in plugin_interfaces.items(): interface.plugins = interface_functions_proxy interface.config = plugin_configs[plugin_name] interface.hooks = NamespaceProxy(interface._hooks) # If it has one, instantiate the plugin object and bind methods to it. if interface._plugin_class is not None: interface._plugin_obj = interface._plugin_class() for ifunc in interface._functions.values(): if isinstance(ifunc.func, UnboundMethod): ifunc.func = ifunc.func.bind(interface._plugin_obj) for attachment in interface._attachments: if isinstance(attachment.func, UnboundMethod): attachment.func = attachment.func.bind(interface._plugin_obj) # Find hooks attachments are referring to and attach them for attachment in interface._attachments: hook_plugin_name = attachment.plugin_name if hook_plugin_name is None: hook_plugin_name = plugin_name hook_name = attachment.hook_name if hook_plugin_name not in plugin_interfaces: raise ValueError(F"{plugin_name} attachment target plugin " F"'{hook_plugin_name}' does not exist") if hook_name not in plugin_interfaces[hook_plugin_name]._hooks: raise ValueError(F"{plugin_name} attachment target hook " F"'{hook_plugin_name}:{hook_name}' does not exist") plugin_interfaces[hook_plugin_name]._hooks[hook_name]._attach(attachment.func, plugin_name) # TODO We've run the object __init__, but not the init hook, that needs to be done interface._initialised = True # Add core functions interface_functions['shepherd'] = NamespaceProxy(core_interface_functions) # Wait until all plugins have run their init before filling in and giving access # to all the interface functions. for plugin_name, interface in plugin_interfaces.items(): # Each plugin has a NamespaceProxy of it's interface functions for read-only attr access interface_functions[plugin_name] = NamespaceProxy(interface._functions) return plugin_interfaces, interface_functions_proxy