You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
113 lines
4.4 KiB
113 lines
4.4 KiB
from enum import Enum, auto
|
|
import inspect
|
|
|
|
class RestoreMethod(Enum):
|
|
DIRECT = auto()
|
|
INIT = auto()
|
|
CLASS_METHOD = auto()
|
|
|
|
#preserve, for when pickling is just a bit too intense
|
|
|
|
# The class key is a reserved dict key used to flag that the dict should be unpacked back out to a class instance
|
|
class_key = "<_jam>"
|
|
# The Preserve module stores some state from init to keep a list of valid packable classes
|
|
preservables = {}
|
|
|
|
# Decorator to mark class as packable and keep track of associated names and classes. When packed, the
|
|
# special key string "<packable>" indicates what class the current dict should be unpacked to
|
|
|
|
# name argument is the string that will identify this class in a packed dict
|
|
def preservable(cls, restore_method=RestoreMethod.DIRECT, name=None):
|
|
if name is None:
|
|
cls._preserve_name = cls.__name__
|
|
else:
|
|
if isinstance(name, str):
|
|
raise Exception("preservable name must be a string")
|
|
cls._preserve_name = name
|
|
cls._restore_method = restore_method
|
|
|
|
if cls._preserve_name in preservables:
|
|
raise Exception("Duplicate preservable class name "+cls._preserve_name)
|
|
preservables[cls._preserve_name] = cls
|
|
|
|
def _preserve(self):
|
|
dict_jam=_preserve_dict(vars(self))
|
|
dict_jam[class_key]=self._preserve_name
|
|
return dict_jam
|
|
|
|
cls.preserve=_preserve
|
|
return cls
|
|
|
|
|
|
def preserve(target_obj):
|
|
# If it's a primitive, store it. If it's a dict or list, recursively preserve that.
|
|
# If it's an instance of another preservable class, call its .preserve() method.
|
|
if isinstance(target_obj, (str, int, float, bool, type(None))):
|
|
return target_obj
|
|
elif isinstance(target_obj, dict):
|
|
return _preserve_dict(target_obj)
|
|
elif isinstance(target_obj, list):
|
|
list_jam = []
|
|
for val in target_obj:
|
|
list_jam.append(preserve(val))
|
|
return list_jam
|
|
elif hasattr(target_obj, "_preserve_name"):
|
|
return target_obj.preserve()
|
|
else:
|
|
raise Exception("Object "+str(target_obj)+" is not preservable")
|
|
|
|
def _preserve_dict(target_dict):
|
|
dict_jam = {}
|
|
for k,val in target_dict.items():
|
|
if not isinstance(k,str):
|
|
raise Exception("Non-string dictionary keys are not preservable")
|
|
if k == class_key:
|
|
raise Exception("Key "+class_key+" is reserved for internal use")
|
|
dict_jam[k]=preserve(val)
|
|
return dict_jam
|
|
|
|
def restore(obj_jam):
|
|
if isinstance(obj_jam, (str, int, float, bool, type(None))):
|
|
return obj_jam
|
|
elif isinstance(obj_jam, dict):
|
|
return _restore_dict(obj_jam)
|
|
elif isinstance(obj_jam, list):
|
|
restored_list = []
|
|
for val in obj_jam:
|
|
restored_list.append(restore(val))
|
|
return restored_list
|
|
else:
|
|
raise Exception("Object "+str(obj_jam)+" is not restorable")
|
|
|
|
def _restore_dict(dict_jam):
|
|
restored_dict = {}
|
|
for k,val in dict_jam.items():
|
|
if not isinstance(k,str):
|
|
raise Exception("Non-string dictionary keys are not restorable")
|
|
if k != class_key:
|
|
restored_dict[k]=restore(val)
|
|
|
|
# Check if this is an object that needs to be restored back to a class instance
|
|
if class_key in dict_jam:
|
|
if dict_jam[class_key] not in preservables:
|
|
raise Exception("Class "+dict_jam[class_key]+" has not been decorated as preservable")
|
|
f_class=preservables[dict_jam[class_key]]
|
|
# If DIRECT, skip __init__ and set attributes back directly
|
|
if f_class._restore_method == RestoreMethod.DIRECT:
|
|
restored_instance = f_class.__new__(f_class)
|
|
restored_instance.__dict__.update(restored_dict)
|
|
#if INIT, pass all attributes as keywords to __init__ method
|
|
elif f_class._restore_method == RestoreMethod.INIT:
|
|
restored_instance = f_class(**restored_dict)
|
|
# IF CLASS_METHOD, pass all attributes as keyword aguments to classmethod "unpack()"
|
|
elif f_class._restore_method == RestoreMethod.CLASS_METHOD:
|
|
if inspect.ismethod(getattr(f_class, "restore", None)):
|
|
restored_instance = f_class.restore(**restored_dict)
|
|
else:
|
|
raise Exception("Class "+str(f_class)+" does not have classmethod 'restore()'")
|
|
else:
|
|
raise Exception("Class _restore_method "+str(f_class._restore_method)+" is not supported")
|
|
|
|
return restored_instance
|
|
else:
|
|
return restored_dict |