You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/config.py

275 lines
9.3 KiB

"""
Configuration managment module. Enables configuration to be validated against
requirement definitions before being loaded and used.
"""
import re
import toml
from .freezedry import freezedryable, rehydrate
class InvalidConfigError(Exception):
pass
# On start,
# get conf_defs via imports in core
# Load a conf file
# Validate the Shepherd table in conf file, then load its values to get module list
# Validate other loaded tables/modules, then load their values
# How do modules wind up with their instance of ConfigValues? Return val instance
# from validation function - as it needs to build the vals instance while validating anyway
# Validate a conf file given a module or config_def list
# idea is to create these similar to how arg parser works
# how to store individual items in def? Dict of...
# Tables need a dict, lists/arrays need a list of dicts, but what's in the dict?
# if it's another instacnce of ConfigDef, then each Config Def needs to handle
# one item, but cater for all the different types of items - and many of those
# should be able to add a new item.
# could have a separate class for the root, but lower tables really need to
# perform exactly the same...
# config def required interface:
# Validate values.
# The Table and Array terms used here are directly from the TOML convention, but they essentially
# map directly to Dictionaries (Tables), and Lists (Arrays)
class _ConfigDefinition():
def __init__(self, default=None, optional=False, helptext=""):
self.default = default
self.optional = optional
self.helptext = helptext
def validate(self, value): # pylint: disable=W0613
raise TypeError("_ConfigDefinition.validate() is an abstract method")
@freezedryable
class BoolDef(_ConfigDefinition):
def __init__(self, default=None, optional=False, helptext=""): # pylint: disable=W0235
super().__init__(default, optional, helptext)
def validate(self, value):
if not isinstance(value, bool):
raise InvalidConfigError("Config value must be a boolean")
@freezedryable
class IntDef(_ConfigDefinition):
def __init__(self, default=None, minval=None, maxval=None,
optional=False, helptext=""):
super().__init__(default, optional, helptext)
self.minval = minval
self.maxval = maxval
def validate(self, value):
if not isinstance(value, int):
raise InvalidConfigError("Config value must be an integer")
if self.minval is not None and value < self.minval:
raise InvalidConfigError("Config value must be >= " +
str(self.minval))
if self.maxval is not None and value > self.maxval:
raise InvalidConfigError("Config value must be <= " +
str(self.maxval))
@freezedryable
class StringDef(_ConfigDefinition):
def __init__(self, default="", minlength=None, maxlength=None,
optional=False, helptext=""):
super().__init__(default, optional, helptext)
self.minlength = minlength
self.maxlength = maxlength
def validate(self, value):
if not isinstance(value, str):
raise InvalidConfigError(F"Config value must be a string and is {value}")
if self.minlength is not None and len(value) < self.minlength:
raise InvalidConfigError("Config string length must be >= " +
str(self.minlength))
if self.maxlength is not None and len(value) > self.maxlength:
raise InvalidConfigError("Config string length must be <= " +
str(self.maxlength))
@freezedryable
class TableDef(_ConfigDefinition):
def __init__(self, default=None, optional=False, helptext=""):
super().__init__(default, optional, helptext)
self.def_table = {}
def add_def(self, name, newdef):
if not isinstance(newdef, _ConfigDefinition):
raise TypeError("Config definiton must be an instance of a "
"ConfigDefinition subclass")
if not isinstance(name, str):
raise TypeError("Config definition name must be a string")
self.def_table[name] = newdef
return newdef
def validate(self, value_table): # pylint: disable=W0221
def_set = set(self.def_table.keys())
value_set = set(value_table.keys())
for missing_key in def_set - value_set:
if not self.def_table[missing_key].optional:
raise InvalidConfigError("Table must contain key: " +
missing_key)
else:
value_table[missing_key] = self.def_table[missing_key].default
for extra_key in value_set - def_set:
raise InvalidConfigError("Table contains unknown key: " +
extra_key)
for key, value in value_table.items():
try:
self.def_table[key].validate(value)
except InvalidConfigError as e:
e.args = ("Key: " + key,) + e.args
raise
class _ArrayDefMixin():
def validate(self, value_array):
if not isinstance(value_array, list):
raise InvalidConfigError("Config item must be an array")
for index, value in enumerate(value_array):
try:
super().validate(value)
except InvalidConfigError as e:
e.args = ("Array index: " + str(index),) + e.args
raise
@freezedryable
class BoolArrayDef(_ArrayDefMixin, BoolDef):
pass
@freezedryable
class IntArrayDef(_ArrayDefMixin, IntDef):
pass
@freezedryable
class StringArrayDef(_ArrayDefMixin, StringDef):
pass
@freezedryable
class TableArrayDef(_ArrayDefMixin, TableDef):
pass
@freezedryable
class ConfDefinition(TableDef):
pass
class ConfigManager():
def __init__(self):
self.root_config = {}
def load(self, source):
"""
Load a config source into the ConfigManager.
Args:
source: Either a dict config to load directly, a filepath to a TOML file,
or a open file.
"""
if isinstance(source, dict): # load from dict
self.root_config = source
elif isinstance(source, str): # load from pathname
with open(source, 'r') as conf_file:
self.root_config = toml.load(conf_file)
else: # load from file
self.root_config = toml.load(source)
def get_config(self, table_name, conf_def):
"""
Get a config dict called ``table_name`` and validate
it against ``conf_def`` before returning it.
Note that as part of validation, optional keys that are missing will be
filled in with their default values (see ``TableDef``).
Args:
table_name: (str) Name of the config dict to find.
conf_def: (ConfDefinition) Config definition to validate against.
"""
if not isinstance(conf_def, ConfDefinition):
raise TypeError("Supplied config definition must be an instance "
"of ConfDefinition")
if table_name not in self.root_config:
raise InvalidConfigError(
"Config must contain table: " + table_name)
try:
conf_def.validate(self.root_config[table_name])
except InvalidConfigError as e:
e.args = ("Module: " + table_name,) + e.args
raise
return self.root_config[table_name]
def get_configs(self, conf_defs):
"""
Get multiple configs at once, validating each one.
Args:
conf_defs: (dict) A dictionary of ConfigDefinitions. The keys are used
as the name to find each config dict, which is then validated against
the corresponding conf def.
Returns:
A dict of config dicts, with keys matching those passed in ``conf_defs``.
"""
config_values = {}
for name, conf_def in conf_defs.items():
config_values[name] = self.get_config(name, conf_def)
return config_values
def get_plugin_configs(self, plugin_classes):
config_values = {}
for plugin_name, plugin_class in plugin_classes.items():
conf_def = ConfDefinition()
plugin_class.define_config(conf_def)
config_values[plugin_name] = self.get_config(plugin_name, conf_def)
return config_values
def dump_toml(self):
return toml.dumps(self.root_config)
def dump_to_file(self, filepath, message=None):
with open(filepath, 'w+') as f:
content = self.dump_toml()
if message is not None:
content = content.rstrip() + gen_comment(message)
f.write(content)
def strip_toml_message(string):
print("stripping...")
return re.sub("(?m)^#\\ shepherd_message:[^\\n]*$\\n?(?:^#[^\\n]+$\\n?)*",
'', string)
def update_toml_message(filepath, message):
with open(filepath, 'r+') as f:
content = f.read()
content = strip_toml_message(content).rstrip()
content += gen_comment(message)
f.seek(0)
f.write(content)
f.truncate()
def gen_comment(string):
return '\n# shepherd_message: ' + '\n# '.join(string.replace('#', '').splitlines()) + '\n'