|
|
|
|
@ -1,5 +1,4 @@
|
|
|
|
|
import re
|
|
|
|
|
import toml
|
|
|
|
|
from copy import deepcopy
|
|
|
|
|
import toml
|
|
|
|
|
from .specification import ConfigSpecification, InvalidConfigError
|
|
|
|
|
@ -7,21 +6,24 @@ from .specification import ConfigSpecification, InvalidConfigError
|
|
|
|
|
|
|
|
|
|
class ConfigManager():
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.root_config = {}
|
|
|
|
|
self.config_source = {} # source values
|
|
|
|
|
self.root_config = {} # validated config
|
|
|
|
|
self.confspecs = {}
|
|
|
|
|
self.frozen_config = {}
|
|
|
|
|
self.frozen_config = {} # validated config to reapply each load
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _load_source(source):
|
|
|
|
|
def _get_source(source):
|
|
|
|
|
"""
|
|
|
|
|
Accept a filepath or opened file representing a TOML file, or a direct dict,
|
|
|
|
|
and return a plain parsed dict.
|
|
|
|
|
"""
|
|
|
|
|
if isinstance(source, dict): # load from dict
|
|
|
|
|
return source
|
|
|
|
|
elif isinstance(source, str): # load from pathname
|
|
|
|
|
|
|
|
|
|
if isinstance(source, str): # load from pathname
|
|
|
|
|
with open(source, 'r') as conf_file:
|
|
|
|
|
return toml.load(conf_file)
|
|
|
|
|
|
|
|
|
|
else: # load from file
|
|
|
|
|
return toml.load(source)
|
|
|
|
|
|
|
|
|
|
@ -33,8 +35,10 @@ class ConfigManager():
|
|
|
|
|
source: Either a dict config to load directly, a filepath to a TOML file,
|
|
|
|
|
or an open file.
|
|
|
|
|
"""
|
|
|
|
|
self.root_config = self._load_source(source)
|
|
|
|
|
self._overlay(self.frozen_config, self.root_config)
|
|
|
|
|
self.config_source = self._get_source(source)
|
|
|
|
|
self._overlay(self.frozen_config, self.config_source)
|
|
|
|
|
# New source, so wipe validated config
|
|
|
|
|
self.root_config = {}
|
|
|
|
|
|
|
|
|
|
def load_overlay(self, source):
|
|
|
|
|
"""
|
|
|
|
|
@ -47,8 +51,10 @@ class ConfigManager():
|
|
|
|
|
source: Either the root dict of a data structure to load directly, a filepath to a TOML
|
|
|
|
|
file, or an open TOML file.
|
|
|
|
|
"""
|
|
|
|
|
self._overlay(self._load_source(source), self.root_config)
|
|
|
|
|
self._overlay(self.frozen_config, self.root_config)
|
|
|
|
|
self._overlay(self._get_source(source), self.config_source)
|
|
|
|
|
self._overlay(self.frozen_config, self.config_source)
|
|
|
|
|
# New source, so wipe validated config
|
|
|
|
|
self.root_config = {}
|
|
|
|
|
|
|
|
|
|
def freeze_value(self, bundle_name, *field_names):
|
|
|
|
|
"""
|
|
|
|
|
@ -102,7 +108,7 @@ class ConfigManager():
|
|
|
|
|
Returns a list of config bundle names that do not have a corresponding ConfigSpecification
|
|
|
|
|
stored in the ConfigManager.
|
|
|
|
|
"""
|
|
|
|
|
return list(self.root_config.keys() - self.confspecs.keys())
|
|
|
|
|
return list(self.config_source.keys() - self.confspecs.keys())
|
|
|
|
|
|
|
|
|
|
def _overlay(self, src, dest):
|
|
|
|
|
for key in src:
|
|
|
|
|
@ -113,65 +119,120 @@ class ConfigManager():
|
|
|
|
|
# Otherwise it's either an existing value to be replaced or needs to be added.
|
|
|
|
|
dest[key] = src[key]
|
|
|
|
|
|
|
|
|
|
def get_config_bundle(self, bundle_name, conf_spec=None):
|
|
|
|
|
def validate_bundle(self, bundle_name, conf_spec=None):
|
|
|
|
|
"""
|
|
|
|
|
Get a config bundle called ``bundle_name`` and validate
|
|
|
|
|
it against the corresponding config specification stored in the ConfigManager.
|
|
|
|
|
If ``conf_spec`` is supplied, it gets used instead. Returns a copy of the validated
|
|
|
|
|
config bundle dict.
|
|
|
|
|
Validate the config bundle called ``bundle_name`` against the corresponding specification
|
|
|
|
|
stored in the ConfigManager. If ``conf_spec`` is supplied, it gets used instead.
|
|
|
|
|
|
|
|
|
|
Note that as part of validation, optional keys that are missing will be
|
|
|
|
|
filled in with their default values (see ``DictSpec``). This function will copy
|
|
|
|
|
the config bundle *after* validation, and so config loaded in the ConfManager will
|
|
|
|
|
be modified, but future ConfigManager manipulations won't change the returned config
|
|
|
|
|
bundle.
|
|
|
|
|
Stores the resulting validated config bundle for later retrieval with
|
|
|
|
|
``get_config_bundle()``.
|
|
|
|
|
|
|
|
|
|
Note that as part of validation, optional keys that are missing will be filled in with
|
|
|
|
|
their default values (see ``DictSpec``).
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config_name: (str) Name of the config dict to find.
|
|
|
|
|
bundle_name: (str) Name of the config dict to find.
|
|
|
|
|
conf_spec: (ConfigSpecification) Optional config specification to validate against.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: A deepcopy of the validated config bundle.
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
InvalidConfigError: If the canfig source fails validation, or a matching config
|
|
|
|
|
specification can't be found.
|
|
|
|
|
"""
|
|
|
|
|
if not isinstance(conf_spec, ConfigSpecification):
|
|
|
|
|
if bundle_name not in self.confspecs:
|
|
|
|
|
raise InvalidConfigError(
|
|
|
|
|
"No ConfigSpecification supplied for bundle: " + bundle_name)
|
|
|
|
|
conf_spec = self.confspecs[bundle_name]
|
|
|
|
|
|
|
|
|
|
if bundle_name not in self.root_config:
|
|
|
|
|
raise InvalidConfigError(
|
|
|
|
|
"Config must contain dict: " + bundle_name)
|
|
|
|
|
if bundle_name not in self.config_source:
|
|
|
|
|
raise InvalidConfigError("Config source must contain dict: " + bundle_name)
|
|
|
|
|
|
|
|
|
|
bundle_source = deepcopy(self.config_source[bundle_name])
|
|
|
|
|
try:
|
|
|
|
|
conf_spec.validate(self.root_config[bundle_name])
|
|
|
|
|
conf_spec.validate(bundle_source)
|
|
|
|
|
except InvalidConfigError as e:
|
|
|
|
|
e.args = ("Bundle: " + bundle_name,) + e.args
|
|
|
|
|
raise
|
|
|
|
|
return deepcopy(self.root_config[bundle_name])
|
|
|
|
|
|
|
|
|
|
def get_config_bundles(self, bundle_names):
|
|
|
|
|
self.root_config[bundle_name] = bundle_source
|
|
|
|
|
return deepcopy(bundle_source)
|
|
|
|
|
|
|
|
|
|
def validate_bundles(self, bundle_names):
|
|
|
|
|
"""
|
|
|
|
|
Get multiple config bundles from the root dict at once, validating each one with the
|
|
|
|
|
corresponding ConfigSpecification stored in the ConfigManager. See ``get_config_bundle``
|
|
|
|
|
Validate multiple config bundles at once, validating each one with the corresponding
|
|
|
|
|
ConfigSpecification stored in the ConfigManager. See ``validate_bundle()``.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
bundle_names: A list of config bundle names to get. If dictionary is supplied, uses
|
|
|
|
|
the values as ConfigSpecifications rather than looking up a stored one in the
|
|
|
|
|
the values as ConfigSpecifications rather than looking up the ones stored in the
|
|
|
|
|
ConfigManager.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
A dict of config dicts, with keys matching those passed in ``bundle_names``.
|
|
|
|
|
dict: A dict of deepcopied config bundles, with keys matching those passed in
|
|
|
|
|
``bundle_names``.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
config_values = {}
|
|
|
|
|
if isinstance(bundle_names, dict):
|
|
|
|
|
for name, conf_spec in bundle_names.items():
|
|
|
|
|
config_values[name] = self.validate_bundle(name, conf_spec)
|
|
|
|
|
config_values[name] = self.get_config_bundle(name, conf_spec)
|
|
|
|
|
else:
|
|
|
|
|
for name in bundle_names:
|
|
|
|
|
config_values[name] = self.validate_bundle(name)
|
|
|
|
|
|
|
|
|
|
return config_values
|
|
|
|
|
|
|
|
|
|
def get_config_bundle(self, bundle_name):
|
|
|
|
|
"""
|
|
|
|
|
Get a validated config bundle called ``bundle_name``. If not yet validated, will validate
|
|
|
|
|
the config source against the corresponding config specification stored in the
|
|
|
|
|
ConfigManager (see ``validate_bundle()``).
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
bundle_name: (str) Name of the config bundle to find.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: A deepcopy of the validated config bundle.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if bundle_name not in self.root_config:
|
|
|
|
|
return self.validate_bundle(bundle_name)
|
|
|
|
|
|
|
|
|
|
return self.root_config[bundle_name]
|
|
|
|
|
|
|
|
|
|
def get_config_bundles(self, bundle_names):
|
|
|
|
|
"""
|
|
|
|
|
Get multiple config bundles at once. If not yet validated, each will validate
|
|
|
|
|
their config source against the corresponding config specification stored in the
|
|
|
|
|
ConfigManager (see ``validate_bundle()``).
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
bundle_names: A list of config bundle names to get.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: A dict of validated config bundles, with keys matching those passed in
|
|
|
|
|
``bundle_names``.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
config_values = {}
|
|
|
|
|
for name in bundle_names:
|
|
|
|
|
config_values[name] = self.get_config_bundle(name)
|
|
|
|
|
|
|
|
|
|
return config_values
|
|
|
|
|
|
|
|
|
|
def get_bundle_names(self):
|
|
|
|
|
"""
|
|
|
|
|
Returns a list of names of top level config bundles
|
|
|
|
|
Returns a list of config bundle names contained in the source.
|
|
|
|
|
"""
|
|
|
|
|
return list(self.root_config.keys())
|
|
|
|
|
return list(self.config_source.keys())
|
|
|
|
|
|
|
|
|
|
def dump_toml(self):
|
|
|
|
|
if self.root_config.keys() != self.config_source.keys():
|
|
|
|
|
raise Exception("Can't dump an unvalidated config table!")
|
|
|
|
|
return toml.dumps(self.root_config)
|
|
|
|
|
|
|
|
|
|
def dump_to_file(self, filepath, message=None):
|
|
|
|
|
|