""" Implements both the main task scheduler for Shepherd and the Session system for restoring state between power cycles. """ from abc import ABC, abstractmethod import json from pathlib import Path import logging from collections import namedtuple import threading from datetime import datetime, timedelta from dateutil import tz import pytz from apscheduler.triggers.cron import CronTrigger as APCronTrigger from configspec import * import preserve # from .util import HoldLock log = logging.getLogger("shepherd.agent.tasks") def register_on(core_interface): """ Register the session confspec and hooks on the core interface passed in - `start_tasks` later assumes that these hooks are present. """ confspec = ConfigSpecification() confspec.add_spec("resume_delay", IntSpec(helptext="Initial estimate of the time taken to" " resume a session, in seconds"), default=180) confspec.add_spec("enable_suspend", BoolSpec(helptext="Enables suspension of the agent session" " in between tasks"), default=True) confspec.add_spec("min_suspend_time", IntSpec(helptext="Smallest wait period before the next" " scheduled task that the agent will decide to" " suspend, in seconds"), default=300) core_interface.confspec.add_spec("session", confspec, optional=True, default={}) # `resume_time` is a DateTime indicating when the session should resume - it already has the # resume delay applied. Hook should return True on success core_interface.register_hook("session_suspend", ["resume_time"]) class TaskTrigger(ABC): """Abstract trigger class""" @abstractmethod def next_time(self, base_time): """ Return a time indicating the next trigger time after base_time. Return None if no more trigger events. Should be a DateTime object with the `tz.tzutc()` timezone. """ @preserve.preservable(exclude_attrs=['ap_trigger']) class CronTrigger(TaskTrigger): """ Interprets Cron strings using a wrapper around the APScheduler CronTrigger (and so function is similar). Values left as default or supplied as None are set to a wildcard, unless it is a smaller unit than those supplied - where it instead gets set to it's minimum (so setting `hour` to 3 will set `minute` and `second` to 0). The trigger format will be matched against The timezone used is always the local system timezone. Details available at https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html """ def __init__(self, month=None, day=None, day_of_week=None, hour=None, minute=None, second=None): self.month = month self.day = day self.day_of_week = day_of_week self.hour = hour self.minute = minute self.second = second self.__restore_init__() def __restore_init__(self): # Default timezone is the one from tzlocal self.ap_trigger = APCronTrigger(month=self.month, day=self.day, day_of_week=self.day_of_week, hour=self.hour, minute=self.minute, second=self.second) def next_time(self, base_time): """ Return a time indicating the next trigger time after base_time. Return None if no more trigger events. """ # Convert base_time to UTC with dateutil, then to pytz which APScheduler requires. utc_base_time = base_time.astimezone(tz.tzutc()).astimezone(pytz.utc) fire_time = self.ap_trigger.get_next_fire_time(None, utc_base_time) # Convert back to UTC, as ap_trigger returns a value with local timezone # Use DateUtil, as it doesn't add other crap to tzinfo return fire_time.astimezone(pytz.utc).astimezone(tz.tzutc()) CronTriggerSpec = ConfigSpecification() CronTriggerSpec.add_specs({ 'month': StringSpec(helptext="Month in year, 1-12"), 'day': StringSpec(helptext="Day of month, 1-32"), 'day_of_week': StringSpec(helptext="Day of week, 0-6 or mon,tue,wed,thu,fri,sat,sun"), 'hour': StringSpec(helptext="Hour in day, 0-23"), 'minute': StringSpec(helptext="Minute in hour, 0-59"), 'second': StringSpec(helptext="Second in minute, 0-59"), }, optional=True) # class IntervalTrigger(TaskTrigger): """ Triggers every x period starting from when it was first created (carries over lowpower) """ # pass # class SingleTrigger(TaskTrigger): """ Either pass a whole datetime instance, or a delta like a period that gets added to current. """ # pass @preserve.preservable class Task(): def __init__(self, trigger, interface_call, use_session=True): """ Define a new task. If `use_session` is true, will only add the task when a new session is created, otherwise it will be restored from the old session. Suspended sessions will also be resumed in order to perform tasks where `use_session` is true. If `use_session` is false, the task will be added on every init, and will not be saved when a session is suspended. """ self.trigger = trigger self.interface_call = interface_call self.use_session = use_session # InterfaceCall already handles the callables and args for us, we just need to preserve # them. Trigger is going to be multiple formats, but the most common will be Cron style. @preserve.preservable class Session(): """ Container class to hold session details """ def __init__(self, config, tasks, resume_delay, resume_time=None): """ Create a session instance. `config` is the applied config for the current session, used to detect when it changes `tasks` is the list of tasks saved from the old session `resume_delay` is the estimated time taken from `resume_time` when deciding to resume the session, to compensate for the time taken to resume `resume_time` is the intended resume time (slightly before the next scheduled task) """ self.config = config self.tasks = tasks self.resume_time = resume_time self.resume_delay = resume_delay @classmethod def load(cls, root_dir, applied_config): """ Load a Session instance from the shepherd.session file. Returns None if the config has changed or no file is found. """ session_file = Path(root_dir, 'shepherd.session') if session_file.exists(): session = preserve.restore(json.loads(session_file.read_text())) if session.config == applied_config: return session log.info("Config has changed since last session") else: log.info("No existing session file found") return None def save(self, root_dir): """ Peel out non-session tasks, then save this Session instance to the shepherd.session file. """ all_tasks = self.tasks self.tasks = [task for task in all_tasks if task.use_session] session_file = Path(root_dir, 'shepherd.session') session_file.write_text(json.dumps(preserve.preserve(self))) self.tasks = all_tasks def init_tasks(config, root_dir, init_tasklist, applied_config, interface_functions): """ Generate the list of tasks to be run. Attempt to restore existing session, if it is present. Use the supplied list of init tasks, ignoring tasks marked with 'use_session' unless we're starting a new session. Resolves task interface calls, and returns the task session. """ session = Session.load(root_dir, applied_config) if session is None: log.info("Starting new session") session = Session(applied_config, init_tasklist, config['resume_delay']) else: # Add non-session tasks session.tasks.extend([task for task in init_tasklist if not task.use_session]) # Resolve task interface calls for task in session.tasks: task.interface_call.resolve(interface_functions) return session ScheduledTask = namedtuple("ScheduledTask", ["scheduled_for", "task"]) _update_thread_init_done = threading.Event() _stop_event = threading.Event() def stop(): _stop_event.set() log.info("Tasks thread stop requested.") def start_tasks(core_interface, session): """ Initialise the Tasks session and start the Tasks update thread. """ # Clear for easier testing _stop_event.clear() _update_thread_init_done.clear() config = core_interface.config['session'] suspend_hook = core_interface.hooks.session_suspend tasks_thread = threading.Thread(target=_tasks_update_loop, args=(config, suspend_hook, session)) tasks_thread.start() # Wait for init so our log makes sense _update_thread_init_done.wait() return tasks_thread MIN_DELAY = 0.01 # Minimum time (in seconds) the task loop will sleep for. def _tasks_update_loop(config, suspend_hook, session): sched_tasks = [] # When resuming, schedule tasks from the desired resume time, even if it's in the past base_time = session.resume_time now = datetime.now(tz.tzutc()) # If it's a new session, only schedule tasks from now. if base_time is None: base_time = now # Maximum permitted snooze is currently hardcoded to 5 minutes. This means that if we # resume the session more than 5 minutes later than we'd intended (`session.resume_time`), # only run tasks that would have been scheduled for the last 5 minutes. max_snooze_time = timedelta(minutes=5) if base_time < now-max_snooze_time: log.warning(F"Session resumed more than maximum snooze time ({max_snooze_time}) after" F" intended session resume time ({base_time}). Only scheduling tasks after" F" {now-max_snooze_time}, so may have missed some scheduled tasks.") base_time = now-max_snooze_time if len(session.tasks) == 0: log.info("No tasks scheduled. Stopping Tasks thread.") _update_thread_init_done.set() return for task in session.tasks: scheduled_time = task.trigger.get_next_time(base_time) sched_tasks.append(ScheduledTask(scheduled_time, task)) suspend_available = False if config['enable_suspend']: if suspend_hook.attachments: suspend_available = True log.info("Session suspension enabled.") else: log.warning("'enable_suspend' set to true, but no suspend hooks are attached. Add" " a plugin that provides a suspend hook.") # Let our `start_tasks` call continue _update_thread_init_done.set() # Order by next first sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for) while True: now = datetime.now(tz.tzutc()) if sched_tasks[0].scheduled_for <= now: # Scheduled time has passed, run the task log.info(F"Running task {sched_tasks[0].task.interface_call}...") # Should we be catching exceptions for this? sched_tasks[0].task.interface_call.call() # Reschedule and sort sched_tasks[0].scheduled_for = sched_tasks[0].task.trigger.get_next_time(now) log.info(F"Done. Rescheduling task for {sched_tasks[0].scheduled_for}.") sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for) else: time_to_wait = sched_tasks[0].scheduled_for - now if suspend_available: pass else: _stop_event.wait(max(time_to_wait.total_seconds(), MIN_DELAY)) if _stop_event.is_set(): log.warning("Tasks thread stopping...") _stop_event.clear() break _update_thread_init_done.clear() # TODO Handle case when tasks return None as next trigger time, and when no triggers are left # TODO Add maximum suspend period # TODO Add "snooze" task checking even on new session, to catch tasks we miss if we restart # due to new config