""" Configuration managment module. Enables configuration to be validated against requirement definitions before being loaded and used. """ import re import toml from .freezedry import freezedryable, rehydrate class InvalidConfigError(Exception): pass # On start, # get conf_defs via imports in core # Load a conf file # Validate the Shepherd table in conf file, then load its values to get module list # Validate other loaded tables/modules, then load their values # How do modules wind up with their instance of ConfigValues? Return val instance # from validation function - as it needs to build the vals instance while validating anyway # Validate a conf file given a module or config_def list # idea is to create these similar to how arg parser works # how to store individual items in def? Dict of... # Tables need a dict, lists/arrays need a list of dicts, but what's in the dict? # if it's another instacnce of ConfigDef, then each Config Def needs to handle # one item, but cater for all the different types of items - and many of those # should be able to add a new item. # could have a separate class for the root, but lower tables really need to # perform exactly the same... # config def required interface: # Validate values. # The Table and Array terms used here are directly from the TOML convention, but they essentially # map directly to Dictionaries (Tables), and Lists (Arrays) class _ConfigDefinition(): def __init__(self, default=None, optional=False, helptext=""): self.default = default self.optional = optional self.helptext = helptext def validate(self, value): # pylint: disable=W0613 raise TypeError("_ConfigDefinition.validate() is an abstract method") @freezedryable class BoolDef(_ConfigDefinition): def __init__(self, default=None, optional=False, helptext=""): # pylint: disable=W0235 super().__init__(default, optional, helptext) def validate(self, value): if not isinstance(value, bool): raise InvalidConfigError("Config value must be a boolean") @freezedryable class IntDef(_ConfigDefinition): def __init__(self, default=None, minval=None, maxval=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.minval = minval self.maxval = maxval def validate(self, value): if not isinstance(value, int): raise InvalidConfigError("Config value must be an integer") if self.minval is not None and value < self.minval: raise InvalidConfigError("Config value must be >= " + str(self.minval)) if self.maxval is not None and value > self.maxval: raise InvalidConfigError("Config value must be <= " + str(self.maxval)) @freezedryable class StringDef(_ConfigDefinition): def __init__(self, default="", minlength=None, maxlength=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.minlength = minlength self.maxlength = maxlength def validate(self, value): if not isinstance(value, str): raise InvalidConfigError(F"Config value must be a string and is {value}") if self.minlength is not None and len(value) < self.minlength: raise InvalidConfigError("Config string length must be >= " + str(self.minlength)) if self.maxlength is not None and len(value) > self.maxlength: raise InvalidConfigError("Config string length must be <= " + str(self.maxlength)) @freezedryable class TableDef(_ConfigDefinition): def __init__(self, default=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.def_table = {} def add_def(self, name, newdef): if not isinstance(newdef, _ConfigDefinition): raise TypeError("Config definiton must be an instance of a " "ConfigDefinition subclass") if not isinstance(name, str): raise TypeError("Config definition name must be a string") self.def_table[name] = newdef return newdef def validate(self, value_table): # pylint: disable=W0221 def_set = set(self.def_table.keys()) value_set = set(value_table.keys()) for missing_key in def_set - value_set: if not self.def_table[missing_key].optional: raise InvalidConfigError("Table must contain key: " + missing_key) else: value_table[missing_key] = self.def_table[missing_key].default for extra_key in value_set - def_set: raise InvalidConfigError("Table contains unknown key: " + extra_key) for key, value in value_table.items(): try: self.def_table[key].validate(value) except InvalidConfigError as e: e.args = ("Key: " + key,) + e.args raise class _ArrayDefMixin(): def validate(self, value_array): if not isinstance(value_array, list): raise InvalidConfigError("Config item must be an array") for index, value in enumerate(value_array): try: super().validate(value) except InvalidConfigError as e: e.args = ("Array index: " + str(index),) + e.args raise @freezedryable class BoolArrayDef(_ArrayDefMixin, BoolDef): pass @freezedryable class IntArrayDef(_ArrayDefMixin, IntDef): pass @freezedryable class StringArrayDef(_ArrayDefMixin, StringDef): pass @freezedryable class TableArrayDef(_ArrayDefMixin, TableDef): pass @freezedryable class ConfDefinition(TableDef): pass class ConfigManager(): def __init__(self): self.root_config = {} def load(self, source): """ Load a config source into the ConfigManager. Args: source: Either a dict config to load directly, a filepath to a TOML file, or a open file. """ if isinstance(source, dict): # load from dict self.root_config = source elif isinstance(source, str): # load from pathname with open(source, 'r') as conf_file: self.root_config = toml.load(conf_file) else: # load from file self.root_config = toml.load(source) def get_config(self, table_name, conf_def): """ Get a config dict called ``table_name`` and validate it against ``conf_def`` before returning it. Note that as part of validation, optional keys that are missing will be filled in with their default values (see ``TableDef``). Args: table_name: (str) Name of the config dict to find. conf_def: (ConfDefinition) Config definition to validate against. """ if not isinstance(conf_def, ConfDefinition): raise TypeError("Supplied config definition must be an instance " "of ConfDefinition") if table_name not in self.root_config: raise InvalidConfigError( "Config must contain table: " + table_name) try: conf_def.validate(self.root_config[table_name]) except InvalidConfigError as e: e.args = ("Module: " + table_name,) + e.args raise return self.root_config[table_name] def get_configs(self, conf_defs): """ Get multiple configs at once, validating each one. Args: conf_defs: (dict) A dictionary of ConfigDefinitions. The keys are used as the name to find each config dict, which is then validated against the corresponding conf def. Returns: A dict of config dicts, with keys matching those passed in ``conf_defs``. """ config_values = {} for name, conf_def in conf_defs.items(): config_values[name] = self.get_config(name, conf_def) return config_values def get_plugin_configs(self, plugin_classes): config_values = {} for plugin_name, plugin_class in plugin_classes.items(): conf_def = ConfDefinition() plugin_class.define_config(conf_def) config_values[plugin_name] = self.get_config(plugin_name, conf_def) return config_values def dump_toml(self): return toml.dumps(self.root_config) def dump_to_file(self, filepath, message=None): with open(filepath, 'w+') as f: content = self.dump_toml() if message is not None: content = content.rstrip() + gen_comment(message) f.write(content) def strip_toml_message(string): print("stripping...") return re.sub("(?m)^#\\ shepherd_message:[^\\n]*$\\n?(?:^#[^\\n]+$\\n?)*", '', string) def update_toml_message(filepath, message): with open(filepath, 'r+') as f: content = f.read() content = strip_toml_message(content).rstrip() content += gen_comment(message) f.seek(0) f.write(content) f.truncate() def gen_comment(string): return '\n# shepherd_message: ' + '\n# '.join(string.replace('#', '').splitlines()) + '\n'