""" Configuration managment module. Enables configuration to be validated against requirement definitions before being loaded and used. """ import re import toml from .freezedry import freezedryable, rehydrate class InvalidConfigError(Exception): pass # On start, # get conf_defs via imports in core # Load a conf file # Validate the Shepherd table in conf file, then load its values to get module list # Validate other loaded tables/modules, then load their values # How do modules wind up with their instance of ConfigValues? Return val instance # from validation function - as it needs to build the vals instance while validating anyway # Validate a conf file given a module or config_def list # idea is to create these similar to how arg parser works # how to store individual items in def? Dict of... # Tables need a dict, lists/arrays need a list of dicts, but what's in the dict? # if it's another instacnce of ConfigDef, then each Config Def needs to handle # one item, but cater for all the different types of items - and many of those # should be able to add a new item. # could have a separate class for the root, but lower tables really need to # perform exactly the same... # config def required interface: # Validate values. # The Table and Array terms used here are directly from the TOML convention, but they essentially # map directly to Dictionaries (Tables), and Lists (Arrays) class _ConfigDefinition(): def __init__(self, default=None, optional=False, helptext=""): self.default = default self.optional = optional self.helptext = helptext def validate(self, value): # pylint: disable=W0613 raise TypeError("_ConfigDefinition.validate() is an abstract method") @freezedryable class BoolDef(_ConfigDefinition): def __init__(self, default=None, optional=False, helptext=""): # pylint: disable=W0235 super().__init__(default, optional, helptext) def validate(self, value): if not isinstance(value, bool): raise InvalidConfigError("Config value must be a boolean") @freezedryable class IntDef(_ConfigDefinition): def __init__(self, default=None, minval=None, maxval=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.minval = minval self.maxval = maxval def validate(self, value): if not isinstance(value, int): raise InvalidConfigError("Config value must be an integer") if self.minval is not None and value < self.minval: raise InvalidConfigError("Config value must be >= " + str(self.minval)) if self.maxval is not None and value > self.maxval: raise InvalidConfigError("Config value must be <= " + str(self.maxval)) @freezedryable class StringDef(_ConfigDefinition): def __init__(self, default="", minlength=None, maxlength=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.minlength = minlength self.maxlength = maxlength def validate(self, value): if not isinstance(value, str): raise InvalidConfigError(F"Config value must be a string and is {value}") if self.minlength is not None and len(value) < self.minlength: raise InvalidConfigError("Config string length must be >= " + str(self.minlength)) if self.maxlength is not None and len(value) > self.maxlength: raise InvalidConfigError("Config string length must be <= " + str(self.maxlength)) @freezedryable class TableDef(_ConfigDefinition): def __init__(self, default=None, optional=False, helptext=""): super().__init__(default, optional, helptext) self.def_table = {} def add_def(self, name, newdef): if not isinstance(newdef, _ConfigDefinition): raise TypeError("Config definiton must be an instance of a " "ConfigDefinition subclass") if not isinstance(name, str): raise TypeError("Config definition name must be a string") self.def_table[name] = newdef return newdef def validate(self, value_table): # pylint: disable=W0221 def_set = set(self.def_table.keys()) value_set = set(value_table.keys()) for missing_key in def_set - value_set: if not self.def_table[missing_key].optional: raise InvalidConfigError("Table must contain key: " + missing_key) else: value_table[missing_key] = self.def_table[missing_key].default for extra_key in value_set - def_set: raise InvalidConfigError("Table contains unknown key: " + extra_key) for key, value in value_table.items(): try: self.def_table[key].validate(value) except InvalidConfigError as e: e.args = ("Key: " + key,) + e.args raise class _ArrayDefMixin(): def validate(self, value_array): if not isinstance(value_array, list): raise InvalidConfigError("Config item must be an array") for index, value in enumerate(value_array): try: super().validate(value) except InvalidConfigError as e: e.args = ("Array index: " + str(index),) + e.args raise @freezedryable class BoolArrayDef(_ArrayDefMixin, BoolDef): pass @freezedryable class IntArrayDef(_ArrayDefMixin, IntDef): pass @freezedryable class StringArrayDef(_ArrayDefMixin, StringDef): pass @freezedryable class TableArrayDef(_ArrayDefMixin, TableDef): pass @freezedryable class ConfDefinition(TableDef): pass class ConfigManager(): def __init__(self): self.root_config = {} self.confdefs = {} def load(self, source): """ Load a config source into the ConfigManager, replacing any existing config. Args: source: Either a dict config to load directly, a filepath to a TOML file, or an open file. """ if isinstance(source, dict): # load from dict self.root_config = source elif isinstance(source, str): # load from pathname with open(source, 'r') as conf_file: self.root_config = toml.load(conf_file) else: # load from file self.root_config = toml.load(source) def load_overlay(self, source): """ Load a config source into the ConfigManager, merging it over the top of any existing config. Dicts will be recursively processed with keys being merged and existing values being replaced by the new source. This includes lists, which will be treated as any other value and completely replaced. Args: source: Either a dict config to load directly, a filepath to a TOML file, or an open file. """ if isinstance(source, dict): # load from dict new_source = source elif isinstance(source, str): # load from pathname with open(source, 'r') as conf_file: new_source = toml.load(conf_file) else: # load from file new_source = toml.load(source) self._overlay(new_source, self.root_config) def add_confdef(self, name, confdef): """ Stores a ConfigDefinition for future use when validating the corresponding config table Args: name (str) : Then name to store the config definition under. confdef (ConfigDefinition): The populated ConfigDefinition to store. """ self.confdefs[name]=confdef def add_confdefs(self, confdefs): """ Stores multiple ConfigDefinitions at once for future use when validating the corresponding config tables Args: confdefs : A dict of populated ConfigDefinitions to store, using their keys as names. """ self.confdefs.update(confdefs) def get_missing_confdefs(self): """ Returns a list of config table names that do not have a corresponding ConfigDefinition stored in the ConfigManager. """ return list(self.root_config.keys() - self.confdefs.keys()) def _overlay(self, src, dest): for key in src: # If the key is also in the dest and both are dicts, merge them. if key in dest and isinstance(src[key], dict) and isinstance(dest[key], dict): self._overlay(src[key], dest[key]) else: # Otherwise it's either an existing value to be replaced or needs to be added. dest[key] = src[key] def validate_and_get_config(self, config_name, conf_def=None): """ Get a config dict called ``table_name`` and validate it against the corresponding config definition stored in the ConfigManager. If ``conf_def`` is supplied, it gets used instead. Returns a validated config dictionary. Note that as part of validation, optional keys that are missing will be filled in with their default values (see ``TableDef``). Args: table_name: (str) Name of the config dict to find. conf_def: (ConfDefinition) Optional config definition to validate against. """ if not isinstance(conf_def, ConfDefinition): conf_def = self.confdefs[config_name] if config_name not in self.root_config: raise InvalidConfigError( "Config must contain table: " + config_name) try: conf_def.validate(self.root_config[config_name]) except InvalidConfigError as e: e.args = ("Module: " + config_name,) + e.args raise return self.root_config[config_name] def validate_and_get_configs(self, config_names): """ Get multiple configs from the root table at once, validating each one with the corresponding confdef stored in the ConfigManager. Args: conf_defs: A list of config names to get. If dictionary is supplied, uses the values as ConfigDefinitions rather than looking up a stored one in the ConfigManager. Returns: A dict of config dicts, with keys matching those passed in ``config_names``. """ config_values = {} if isinstance(config_names, dict): for name, conf_def in config_names.items(): config_values[name] = self.validate_and_get_config(name, conf_def) else: for name in config_names: config_values[name] = self.validate_and_get_config(name) return config_values def get_config_names(self): """ Returns a list of names of top level config tables """ return list(self.root_config.keys()) def dump_toml(self): return toml.dumps(self.root_config) def dump_to_file(self, filepath, message=None): with open(filepath, 'w+') as f: content = self.dump_toml() if message is not None: content = content.rstrip() + gen_comment(message) f.write(content) def strip_toml_message(string): print("stripping...") return re.sub("(?m)^#\\ shepherd_message:[^\\n]*$\\n?(?:^#[^\\n]+$\\n?)*", '', string) def update_toml_message(filepath, message): with open(filepath, 'r+') as f: content = f.read() content = strip_toml_message(content).rstrip() content += gen_comment(message) f.seek(0) f.write(content) f.truncate() def gen_comment(string): return '\n# shepherd_message: ' + '\n# '.join(string.replace('#', '').splitlines()) + '\n'