From 70d266f91c60adfb421ba4a7786dc0e91198a5ba Mon Sep 17 00:00:00 2001 From: Thomas Wilson Date: Thu, 21 May 2020 15:19:54 +0800 Subject: [PATCH] Task and session initial structure --- shepherd/agent/core.py | 16 ++- shepherd/agent/plugin.py | 28 ++--- shepherd/agent/tasks.py | 252 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 266 insertions(+), 30 deletions(-) diff --git a/shepherd/agent/core.py b/shepherd/agent/core.py index 5eaa8a2..a04432b 100644 --- a/shepherd/agent/core.py +++ b/shepherd/agent/core.py @@ -13,6 +13,7 @@ from configspec import * from . import plugin from . import control +from . import tasks log = logging.getLogger("shepherd.agent") @@ -67,7 +68,7 @@ class Agent(): # Having modules modify a confspec after it's registered here is a bit of a hack. tasks.register(self.core_interface) control.register(self.core_interface) - + # Because the plugin module caches interfaces, this will then get used when loading # config layers and validating them plugin.load_plugin_interface("shepherd", self.core_interface) @@ -130,14 +131,23 @@ class Agent(): if self.control_enabled: control.start_control(self.core_config["control"], self.root_dir(), - core_update_state, plugin_update_states) + core_update_state, plugin_update_states) - # Need somewhere to eventually pass in the hooks Tasks will need for the lowpower stuff, + # Need somewhere to eventually pass in the hooks Tasks will need for the lowpower stuff, # probably just another init_plugins arg. # Eventually when the dust settles we might revisit converting the core "shepherd" # namespace stuff into it's own plugin interface, as it's using a lot of the same # mechanisms, but we're having to pass it all around individually. + # TODO Collect plugin tasks + + task_session = tasks.init_tasks(self.core_config['session'], self.root_dir(), + [], self.applied_config, self.interface_functions) + + # TODO Any time stabilisation or waiting for Control + + tasks.start_tasks(self.core_interface, task_session) + # tasks.init_tasks(self.core_config) # seperate tasks.start? # plugin.start() # Run the plugin `.run` hooks in seperate threads diff --git a/shepherd/agent/plugin.py b/shepherd/agent/plugin.py index ae29f9f..f943a80 100644 --- a/shepherd/agent/plugin.py +++ b/shepherd/agent/plugin.py @@ -645,24 +645,24 @@ def load_plugin_interface(plugin_name, interface, module=None): interface.register_confspec(confspec_list[0][1]) if interface._confspec is None: - interface._confspec = ConfigSpecification() + interface._confspec = ConfigSpecification() interface._update_state.set_confspec(interface.confspec) if module is not None: - # Scan module for load markers left by decorators and pass them over to register methods - for key, attr in module.__dict__.items(): - if hasattr(attr, "_shepherd_load_marker"): - if isinstance(attr._shepherd_load_marker, FunctionMarker): - interface.register_function(attr, **attr._shepherd_load_marker._asdict()) - elif isinstance(attr._shepherd_load_marker, AttachmentMarker): - interface.register_attachment(attr, **attr._shepherd_load_marker._asdict()) - elif isinstance(attr._shepherd_load_marker, HookMarker): - # Hooks are a little different in that we replace the attr with the hook - newhook = interface.register_hook(**attr._shepherd_load_marker._asdict()) - setattr(module, key, newhook) - elif isinstance(attr._shepherd_load_marker, ClassMarker): - interface.register_class(attr) + # Scan module for load markers left by decorators and pass them over to register methods + for key, attr in module.__dict__.items(): + if hasattr(attr, "_shepherd_load_marker"): + if isinstance(attr._shepherd_load_marker, FunctionMarker): + interface.register_function(attr, **attr._shepherd_load_marker._asdict()) + elif isinstance(attr._shepherd_load_marker, AttachmentMarker): + interface.register_attachment(attr, **attr._shepherd_load_marker._asdict()) + elif isinstance(attr._shepherd_load_marker, HookMarker): + # Hooks are a little different in that we replace the attr with the hook + newhook = interface.register_hook(**attr._shepherd_load_marker._asdict()) + setattr(module, key, newhook) + elif isinstance(attr._shepherd_load_marker, ClassMarker): + interface.register_class(attr) if interface._plugin_class is not None: # Scan plugin class for marked methods diff --git a/shepherd/agent/tasks.py b/shepherd/agent/tasks.py index b447592..7df0310 100644 --- a/shepherd/agent/tasks.py +++ b/shepherd/agent/tasks.py @@ -1,6 +1,15 @@ - +""" +Implements both the main task scheduler for Shepherd and the Session system for restoring +state between power cycles. +""" from abc import ABC, abstractmethod +import json +from pathlib import Path +import logging +from collections import namedtuple +import threading +from datetime import datetime, timedelta from dateutil import tz import pytz @@ -9,6 +18,31 @@ from apscheduler.triggers.cron import CronTrigger as APCronTrigger from configspec import * import preserve +from .util import HoldLock + +log = logging.getLogger("shepherd.agent.tasks") + + +def register(core_interface): + """ + Register the session confspec and hooks on the core interface passed in - `start_tasks` later + assumes that these hooks are present. + """ + confspec = ConfigSpecification() + confspec.add_spec("resume_delay", IntSpec(helptext="Initial estimate of the time taken to" + " resume a session, in seconds"), default=180) + confspec.add_spec("enable_suspend", BoolSpec(helptext="Enables suspension of the agent session" + " in between tasks"), default=True) + confspec.add_spec("min_suspend_time", IntSpec(helptext="Smallest wait period before the next" + " scheduled task that the agent will decide to" + " suspend, in seconds"), default=300) + + core_interface.confspec.add_spec("session", confspec, optional=True, default={}) + + # `resume_time` is a DateTime indicating when the session should resume - it already has the + # resume delay applied. Hook should return True on success + core_interface.register_hook("session_suspend", ["resume_time"]) + class TaskTrigger(ABC): """Abstract trigger class""" @@ -16,7 +50,7 @@ class TaskTrigger(ABC): def next_time(self, base_time): """ Return a time indicating the next trigger time after base_time. Return None if no more - trigger events. + trigger events. Should be a DateTime object with the `tz.tzutc()` timezone. """ @@ -89,23 +123,217 @@ TriggerSpec.add_specs({ # pass +@preserve.preservable class Task(): - def __init__(self, trigger, interface_call): - pass + def __init__(self, trigger, interface_call, use_session=True): + """ + Define a new task. If `use_session` is true, will only add the task when a new session is + created, otherwise it will be restored from the old session. Suspended sessions will also + be resumed in order to perform tasks where `use_session` is true. + + If `use_session` is false, the task will be added on every init, and will not be saved + when a session is suspended. + """ + self.trigger = trigger + self.interface_call = interface_call + self.use_session = use_session # InterfaceCall already handles the callables and args for us, we just need to preserve # them. Trigger is going to be multiple formats, but the most common will be Cron style. -def init_tasks(core_config, applied_config, interface_functions): - pass - # Check if we have a session to load - # Load the session - this includes resolving interfacecalls +@preserve.preservable +class Session(): + """ + Container class to hold session details + """ + + def __init__(self, config, tasks, resume_delay, resume_time=None): + """ + Create a session instance. + `config` is the applied config for the current session, used to detect when it changes + `tasks` is the list of tasks saved from the old session + `resume_delay` is the estimated time taken from `resume_time` when deciding to resume the + session, to compensate for the time taken to resume + `resume_time` is the intended resume time (slightly before the next scheduled task) + """ + self.config = config + self.tasks = tasks + self.resume_time = resume_time + self.resume_delay = resume_delay + + @classmethod + def load(cls, root_dir, applied_config): + """ + Load a Session instance from the shepherd.session file. Returns None + if the config has changed or no file is found. + """ + session_file = Path(root_dir, 'shepherd.session') + if session_file.exists(): + session = preserve.restore(json.loads(session_file.read_text())) + + if session.config == applied_config: + return session + + log.info("Config has changed since last session") + else: + log.info("No existing session file found") + return None + + def save(self, root_dir): + """ + Peel out non-session tasks, then save this Session instance to the + shepherd.session file. + """ + all_tasks = self.tasks + self.tasks = [task for task in all_tasks if task.use_session] + + session_file = Path(root_dir, 'shepherd.session') + session_file.write_text(json.dumps(preserve.preserve(self))) + + self.tasks = all_tasks + + +def init_tasks(config, root_dir, init_tasklist, applied_config, interface_functions): + """ + Generate the list of tasks to be run. Attempt to restore existing session, if it is present. + Use the supplied list of init tasks, ignoring tasks marked with 'use_session' unless we're + starting a new session. + + Resolves task interface calls, and returns the task session. + """ + session = Session.load(root_dir, applied_config) + + if session is None: + log.info("Starting new session") + session = Session(applied_config, init_tasklist, config['resume_delay']) + else: + # Add non-session tasks + session.tasks.extend([task for task in init_tasklist if not task.use_session]) + + # Resolve task interface calls + for task in session.tasks: + task.interface_call.resolve(interface_functions) - # Return a list of tasks + return session -def start_tasks(task_list): - pass +ScheduledTask = namedtuple("ScheduledTask", ["scheduled_for", "task"]) + +_update_thread_init_done = threading.Event() + +_stop_event = threading.Event() + + +def stop(): + _stop_event.set() + log.info("Tasks thread stop requested.") + + +def start_tasks(core_interface, session): + """ + Initialise the Tasks session and start the Tasks update thread. + """ + # Clear for easier testing + _stop_event.clear() + _update_thread_init_done.clear() + + config = core_interface.config['session'] + suspend_hook = core_interface.hooks.session_suspend + + tasks_thread = threading.Thread(target=_tasks_update_loop, + args=(config, suspend_hook, session)) + tasks_thread.start() + + # Wait for init so our log makes sense + _update_thread_init_done.wait() + + return tasks_thread + + +MIN_DELAY = 0.01 # Minimum time (in seconds) the task loop will sleep for. + + +def _tasks_update_loop(config, suspend_hook, session): + + sched_tasks = [] + base_time = session.resume_time + now = datetime.now(tz.tzutc()) + # If it's a new session, only schedule tasks from now. + if base_time is None: + base_time = now + + # Maximum permitted snooze is currently hardcoded to 5 minutes. This means that if we + # resume the session more than 5 minutes later than we'd intended (`session.resume_time`), + # only run tasks that would have been scheduled for the last 5 minutes. + max_snooze_time = timedelta(minutes=5) + if base_time < now-max_snooze_time: + log.warning(F"Session resumed more than maximum snooze time ({max_snooze_time}) after" + F" intended session resume time ({base_time}). Only scheduling tasks after" + F" {now-max_snooze_time}, so may have missed some scheduled tasks.") + base_time = now-max_snooze_time + + if len(session.tasks) == 0: + log.info("No tasks scheduled. Stopping Tasks thread.") + _update_thread_init_done.set() + return + + for task in session.tasks: + scheduled_time = task.trigger.get_next_time(base_time) + sched_tasks.append(ScheduledTask(scheduled_time, task)) + + suspend_available = False + if config['enable_suspend'] and suspend_hook.attachments: + suspend_available = True + + # Let our `start_tasks` call continue + _update_thread_init_done.set() + + # Order by next first + sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for) + + while True: + now = datetime.now(tz.tzutc()) + if sched_tasks[0].scheduled_for <= now: + # Scheduled time has passed, run the task + log.info(F"Running task {sched_tasks[0].task.interface_call}...") + sched_tasks[0].task.interface_call.call() + + # Reschedule and sort + sched_tasks[0].scheduled_for = sched_tasks[0].task.trigger.get_next_time(now) + log.info(F"Done. Rescheduling task for {sched_tasks[0].scheduled_for}.") + + sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for) + + else: + time_to_wait = sched_tasks[0].scheduled_for - now + if suspend_available: + pass + else: + _stop_event.wait(max(time_to_wait.total_seconds(), MIN_DELAY)) + + if _stop_event.is_set(): + log.warning("Tasks thread stopping...") + _stop_event.clear() + break + + _update_thread_init_done.clear() + + # Right, so we have our task list, and calls have been resolved. Assume that any waiting period + # for control has already been handled. + + # Eventually also need a system to wait for time to stabilise (hook?) + + # What do we do when task list is empty? Return directly? + + # That list then gets sorted by that time, and the main loop simply waits for the next scheduled + # time to pass before calling the task (on a different thread). The scheduled_for time gets updated, + # the scheduled_tasks list gets sorted again. The loop then looks at the next task, probably with + # some log if "scheduled_for" is too far in the past, but run the task anyway. + + # Lowpower handling can then be in the section of the cycle where we'd otherwise be doing a delay. + # Have a minimum sleep delay (10ms or something), so we can have a task only run if it's past + # the scheduled_for, and then we can happily use that time to get the _next_ trigger time. + # Start our scheduler thread to fire off tasks, including the initial grace period checks # Due to our task constraints, our scheduler can be somewhat simplified. We know that task # triggers won't change after init, and they will be infrequent enough that we can just @@ -114,5 +342,3 @@ def start_tasks(task_list): # Maybe have "cycles", where we determine what task is going to occur next, and when. # A main dispatch loop then pretty much delays until that point, or triggers the low power # stuff somehow. - - # Does the low power stuff occur here? Or somewhere else?