import importlib from pathlib import Path import inspect import logging import sys import pkgutil from collections import namedtuple from functools import partial from types import MappingProxyType import pkg_resources from configspec import ConfigSpecification from .. import base_plugins log = logging.getLogger(__name__) _loaded_plugins = {} plugin_interfaces = {} plugin_functions = {} class PluginLoadError(Exception): pass ClassMarker = namedtuple("ClassMarker", []) def is_instance_check(classtype): def instance_checker(obj): return isinstance(obj, classtype) return instance_checker def plugin_class(cls): """ Class decorator, used to indicate that a class is to be used as the Plugin Class for this plugin. Note that only one plugin class is allowed per plugin. Only works when placed in the root of the plugin module or package (same as the interface) Use on the class definition: @plugin_class class MyPluginClass: This is equivalent to registering the class directly with the plugin interface later: interface.register_plugin_class(MyPluginClass) """ if not inspect.isclass(cls): raise PluginLoadError(F"@plugin_class can only be used to decorate a class") cls._shepherd_load_marker = ClassMarker() return cls FunctionMarker = namedtuple("FunctionMarker", ["name"]) def plugin_function(func=None, *, name=None): """ Method decorator to register a method as a plugin interface function. Either used directly: @plugin_function def my_method(self): or with optional keyword arguments: @plugin_function(name="someOtherName") def my_badly_named_method(self): Can only be used within the registered Plugin Class (either with @plugin_class or interface.register_plugin_class() ) """ if func is None: return partial(plugin_function, name=name) func._shepherd_load_marker = FunctionMarker(name) return func def plugin_hook(method): return method def plugin_attachment(hookname): def wrapped(method): return method return wrapped class InterfaceFunction(): def __init__(self, func, name=None, unbound=False): """ Wrapper around a callable to define a plugin interface function. If unbound true, will use a temp object to analyse the signature, and defer binding it as a method until _bind() is called. """ self.func = func self._unbound = unbound sigfunc = func if unbound: sigfunc = func.__get__(object()) if not callable(sigfunc): raise TypeError("InterfaceFunction can only be created around a callable or method.") if name: self.name = name else: self.name = sigfunc.__name__ params = inspect.signature(sigfunc) log.debug(F"Loaded interface function {self.name} with parameters: {params}") def _bind(self, obj): """ Bind the wrapped method to an object """ self.func = self.func.__get__(obj) self._unbound = False def __call__(self, *args, **kwargs): if self._unbound: raise Exception( "Cannot call unbound InterfaceFunction (plugin has not yet initialised)") return self.func(*args, **kwargs) class PluginInterface(): def __init__(self): self._confspec = None self._loaded = False self._functions = {} self._plugin_class = None self.config = None self.plugins = None self._plugin_name = "" def _load_pluginclass(self, module): pass def _load_guard(self): if self._loaded: raise PluginLoadError("Cannot call interface register functions once" " plugin is loaded") def register_confspec(self, confspec): self._load_guard() if self._confspec is not None: raise PluginLoadError("Plugin can only register one ConfigSpecification") if not isinstance(confspec, ConfigSpecification): raise PluginLoadError("confspec must be an instance of ConfigSpecification") self._confspec = confspec def register_class(self, cls): self._load_guard() if self._plugin_class is not None: raise PluginLoadError("Plugin can only register one plugin class") if not inspect.isclass(cls): raise PluginLoadError("plugin_class must be a class") self._plugin_class = cls def register_function(self, func, name=None): """ Register a function or method as an interface function for the plugin. If name is not provided, the name of the callable will be used. """ self._load_guard() if isinstance(func, InterfaceFunction): ifunc = func else: ifunc = InterfaceFunction(func, name) if ifunc.name in self._functions: raise PluginLoadError(F"Interface function with name '{ifunc.name}' already exists") self._functions[ifunc.name] = ifunc @property def confspec(self): return self._confspec def discover_base_plugins(): """ Returns a list of base plugin names available to load. These are plugins included with shepherd-agent, in 'base_plugins'. """ return [name for _, name, _ in pkgutil.iter_modules(base_plugins.__path__)] def discover_custom_plugins(plugin_dir=None): """ Returns a list of custom plugin names available to load. This includes all modules or packages within the supplied custom plugin directory. """ if plugin_dir: if Path(plugin_dir).is_dir(): return [name for _, name, _ in pkgutil.iter_modules([plugin_dir])] else: log.warning(F"Custom plugin directory {plugin_dir} does not exist") return [] def discover_installed_plugins(): """ Returns a list of installed plugin names available to load. These are packages that have used the 'shephed.plugin' entrypoint in their setup.py """ return [entrypoint.name for entrypoint in pkg_resources.iter_entry_points('shepherd.plugins')] def load_plugin(plugin_name, plugin_dir=None): """ Finds a Shepherd plugin, loads it, and returns the resulting PluginInterface object. Will check 3 sources, in order: 1. Built-in plugin modules/subpackages within ''shepherd.base_plugins''. Plugin module/package names match the plugin name. 2. Modules/packages within the supplied ''plugin_dir'' path. Plugin module/package names match the plugin name. 3. Any installed packages supplying the ''shepherd.plugin'' entrypoint. Once a module is found, loading it involves scanning the root of the module for a PluginInterface instance. If a confspec isn't registered, a ConfigSpecification instance at the module root will also be implicitly registered to the interface. Lastly, any plugin decorators are scanned for and registered (including a plugin class if present). Args: plugin_name: Name used to try and locate the plugin plugin_dir: Optional directory path to be used for custom plugins Returns: The PluginInterface for the loaded plugin """ if plugin_name in _loaded_plugins: return _loaded_plugins[plugin_name] # Each of the 3 plugin sources have different import mechanisms. Discovery is broken out to # allow them to be listed. Using a try/except block wouldn't be able to tell the difference # between a plugin not being found or //it's// imports not loading correctly. module = None if plugin_name in discover_base_plugins(): module = importlib.import_module(base_plugins.__name__+'.'+plugin_name) log.info(F"Loading base plugin {plugin_name}") elif plugin_name in discover_custom_plugins(plugin_dir): saved_syspath = sys.path try: sys.path = [str(plugin_dir)] module = importlib.import_module(plugin_name) finally: sys.path = saved_syspath modulepath = getattr(module, "__path__", [module.__file__])[0] log.info(F"Loading custom plugin {plugin_name} from {modulepath}") elif plugin_name in discover_installed_plugins(): module = pkg_resources.iter_entry_points('shepherd.plugins', plugin_name)[0].load() log.info(F"Loading installed plugin {plugin_name} from {module.__name__}") if not module: raise PluginLoadError("Could not find plugin "+plugin_name) # Now we have the module, scan it for the two implicit objects we look for - the interface and # the confspec interface_list = inspect.getmembers(module, is_instance_check(PluginInterface)) if not interface_list: raise PluginLoadError("Imported shepherd plugins must contain an instance" " of PluginInterface") if len(interface_list) > 1: log.warning(F"Plugin module {module.__name__} has more" F" than one PluginInterface instance.") _, interface = interface_list[0] interface._plugin_name = plugin_name # Only looks for implicit confspec if one isn't registered. Uses a blank one if none are # supplied. if interface._confspec is None: confspec_list = inspect.getmembers(module, is_instance_check(ConfigSpecification)) if confspec_list: if len(confspec_list) > 1: log.warning(F"Plugin {interface._plugin_name} has more" F" than one root ConfigSpecification instance.") interface.register_confspec(confspec_list[0][1]) else: interface._confspec = ConfigSpecification() # Scan module for load markers left by decorators for name, attr in module.__dict__.items(): if hasattr(attr, "_shepherd_load_marker"): if isinstance(attr._shepherd_load_marker, FunctionMarker): interface.register_function(attr, **attr._shepherd_load_marker._asdict()) elif isinstance(attr._shepherd_load_marker, ClassMarker): interface.register_class(attr) if interface._plugin_class is not None: # Scan plugin class for marked methods for name, attr in interface._plugin_class.__dict__.items(): if hasattr(attr, "_shepherd_load_marker"): if isinstance(attr._shepherd_load_marker, FunctionMarker): # Instance doesn't exist yet, so need to save unbound methods for binding later unbound_func = InterfaceFunction( attr, unbound=True, **attr._shepherd_load_marker._asdict()) interface.register_function(unbound_func) interface._loaded = True _loaded_plugins[plugin_name] = interface return interface def init_plugins(plugin_configs, core_config): """ Initialise plugins named as keys in plugin_configs. Plugins must already be loaded. """ # Pick out plugins to init for plugin_name in plugin_configs.keys(): plugin_interfaces[plugin_name] = _loaded_plugins[plugin_name] plugin_functions_tuples = {} plugin_functions_view = MappingProxyType(plugin_functions_tuples) # Run plugin init and init hooks for plugin_name, interface in plugin_interfaces.items(): interface.plugins = plugin_functions_view interface.config = plugin_configs[plugin_name] # If it has one, instantiate the plugin object and bind functions if interface._plugin_class is not None: interface._plugin_obj = interface._plugin_class() for funcname, ifunc in interface._functions.items(): if ifunc._unbound: ifunc._bind(interface._plugin_obj) plugin_functions[plugin_name] = interface._functions # Fill in the interface functions view that plugins now have access to for name, functions in plugin_functions.items(): plugin_functions_tuples[name] = namedtuple( F'{name}_interface_functions', functions.keys())(**functions)