You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/agent/tasks.py

353 lines
13 KiB

"""
Implements both the main task scheduler for Shepherd and the Session system for restoring
state between power cycles.
"""
from abc import ABC, abstractmethod
import json
from pathlib import Path
import logging
from collections import namedtuple
import threading
from datetime import datetime, timedelta
from dateutil import tz
import pytz
from apscheduler.triggers.cron import CronTrigger as APCronTrigger
from configspec import *
import preserve
from .util import HoldLock
log = logging.getLogger("shepherd.agent.tasks")
def register_on(core_interface):
"""
Register the session confspec and hooks on the core interface passed in - `start_tasks` later
assumes that these hooks are present.
"""
confspec = ConfigSpecification()
confspec.add_spec("resume_delay", IntSpec(helptext="Initial estimate of the time taken to"
" resume a session, in seconds"), default=180)
confspec.add_spec("enable_suspend", BoolSpec(helptext="Enables suspension of the agent session"
" in between tasks"), default=True)
confspec.add_spec("min_suspend_time", IntSpec(helptext="Smallest wait period before the next"
" scheduled task that the agent will decide to"
" suspend, in seconds"), default=300)
core_interface.confspec.add_spec("session", confspec, optional=True, default={})
# `resume_time` is a DateTime indicating when the session should resume - it already has the
# resume delay applied. Hook should return True on success
core_interface.register_hook("session_suspend", ["resume_time"])
class TaskTrigger(ABC):
"""Abstract trigger class"""
@abstractmethod
def next_time(self, base_time):
"""
Return a time indicating the next trigger time after base_time. Return None if no more
trigger events. Should be a DateTime object with the `tz.tzutc()` timezone.
"""
@preserve.preservable(exclude_attrs=['ap_trigger'])
class CronTrigger(TaskTrigger):
"""
Interprets Cron strings using a wrapper around the APScheduler CronTrigger (and so function
is similar). Values left as default or supplied as None are set to a wildcard, unless it is
a smaller unit than those supplied - where it instead gets set to it's minimum (so setting
`hour` to 3 will set `minute` and `second` to 0).
The trigger format will be matched against
The timezone used is always the local system timezone.
Details available at https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html
"""
def __init__(self, month=None, day=None, day_of_week=None, hour=None,
minute=None, second=None):
self.month = month
self.day = day
self.day_of_week = day_of_week
self.hour = hour
self.minute = minute
self.second = second
self.__restore_init__()
def __restore_init__(self):
# Default timezone is the one from tzlocal
self.ap_trigger = APCronTrigger(month=self.month, day=self.day,
day_of_week=self.day_of_week,
hour=self.hour, minute=self.minute,
second=self.second)
def next_time(self, base_time):
"""
Return a time indicating the next trigger time after base_time. Return None if no more
trigger events.
"""
# Convert base_time to UTC with dateutil, then to pytz which APScheduler requires.
utc_base_time = base_time.astimezone(tz.tzutc()).astimezone(pytz.utc)
fire_time = self.ap_trigger.get_next_fire_time(None, utc_base_time)
# Convert back to UTC, as ap_trigger returns a value with local timezone
# Use DateUtil, as it doesn't add other crap to tzinfo
return fire_time.astimezone(pytz.utc).astimezone(tz.tzutc())
TriggerSpec = ConfigSpecification()
TriggerSpec.add_specs({
'month': StringSpec(helptext="Month in year, 1-12"),
'day': StringSpec(helptext="Day of month, 1-32"),
'day_of_week': StringSpec(helptext="Day of week, 0-6 or mon,tue,wed,thu,fri,sat,sun"),
'hour': StringSpec(helptext="Hour in day, 0-23"),
'minute': StringSpec(helptext="Minute in hour, 0-59"),
'second': StringSpec(helptext="Second in minute, 0-59"),
}, optional=True)
# class IntervalTrigger(TaskTrigger):
"""
Triggers every x period starting from when it was first created (carries over lowpower)
"""
# pass
# class SingleTrigger(TaskTrigger):
"""
Either pass a whole datetime instance, or a delta like a period that gets added to current.
"""
# pass
@preserve.preservable
class Task():
def __init__(self, trigger, interface_call, use_session=True):
"""
Define a new task. If `use_session` is true, will only add the task when a new session is
created, otherwise it will be restored from the old session. Suspended sessions will also
be resumed in order to perform tasks where `use_session` is true.
If `use_session` is false, the task will be added on every init, and will not be saved
when a session is suspended.
"""
self.trigger = trigger
self.interface_call = interface_call
self.use_session = use_session
# InterfaceCall already handles the callables and args for us, we just need to preserve
# them. Trigger is going to be multiple formats, but the most common will be Cron style.
@preserve.preservable
class Session():
"""
Container class to hold session details
"""
def __init__(self, config, tasks, resume_delay, resume_time=None):
"""
Create a session instance.
`config` is the applied config for the current session, used to detect when it changes
`tasks` is the list of tasks saved from the old session
`resume_delay` is the estimated time taken from `resume_time` when deciding to resume the
session, to compensate for the time taken to resume
`resume_time` is the intended resume time (slightly before the next scheduled task)
"""
self.config = config
self.tasks = tasks
self.resume_time = resume_time
self.resume_delay = resume_delay
@classmethod
def load(cls, root_dir, applied_config):
"""
Load a Session instance from the shepherd.session file. Returns None
if the config has changed or no file is found.
"""
session_file = Path(root_dir, 'shepherd.session')
if session_file.exists():
session = preserve.restore(json.loads(session_file.read_text()))
if session.config == applied_config:
return session
log.info("Config has changed since last session")
else:
log.info("No existing session file found")
return None
def save(self, root_dir):
"""
Peel out non-session tasks, then save this Session instance to the
shepherd.session file.
"""
all_tasks = self.tasks
self.tasks = [task for task in all_tasks if task.use_session]
session_file = Path(root_dir, 'shepherd.session')
session_file.write_text(json.dumps(preserve.preserve(self)))
self.tasks = all_tasks
def init_tasks(config, root_dir, init_tasklist, applied_config, interface_functions):
"""
Generate the list of tasks to be run. Attempt to restore existing session, if it is present.
Use the supplied list of init tasks, ignoring tasks marked with 'use_session' unless we're
starting a new session.
Resolves task interface calls, and returns the task session.
"""
session = Session.load(root_dir, applied_config)
if session is None:
log.info("Starting new session")
session = Session(applied_config, init_tasklist, config['resume_delay'])
else:
# Add non-session tasks
session.tasks.extend([task for task in init_tasklist if not task.use_session])
# Resolve task interface calls
for task in session.tasks:
task.interface_call.resolve(interface_functions)
return session
ScheduledTask = namedtuple("ScheduledTask", ["scheduled_for", "task"])
_update_thread_init_done = threading.Event()
_stop_event = threading.Event()
def stop():
_stop_event.set()
log.info("Tasks thread stop requested.")
def start_tasks(core_interface, session):
"""
Initialise the Tasks session and start the Tasks update thread.
"""
# Clear for easier testing
_stop_event.clear()
_update_thread_init_done.clear()
config = core_interface.config['session']
suspend_hook = core_interface.hooks.session_suspend
tasks_thread = threading.Thread(target=_tasks_update_loop,
args=(config, suspend_hook, session))
tasks_thread.start()
# Wait for init so our log makes sense
_update_thread_init_done.wait()
return tasks_thread
MIN_DELAY = 0.01 # Minimum time (in seconds) the task loop will sleep for.
def _tasks_update_loop(config, suspend_hook, session):
sched_tasks = []
# When resuming, schedule tasks from the desired resume time, even if it's in the past
base_time = session.resume_time
now = datetime.now(tz.tzutc())
# If it's a new session, only schedule tasks from now.
if base_time is None:
base_time = now
# Maximum permitted snooze is currently hardcoded to 5 minutes. This means that if we
# resume the session more than 5 minutes later than we'd intended (`session.resume_time`),
# only run tasks that would have been scheduled for the last 5 minutes.
max_snooze_time = timedelta(minutes=5)
if base_time < now-max_snooze_time:
log.warning(F"Session resumed more than maximum snooze time ({max_snooze_time}) after"
F" intended session resume time ({base_time}). Only scheduling tasks after"
F" {now-max_snooze_time}, so may have missed some scheduled tasks.")
base_time = now-max_snooze_time
if len(session.tasks) == 0:
log.info("No tasks scheduled. Stopping Tasks thread.")
_update_thread_init_done.set()
return
for task in session.tasks:
scheduled_time = task.trigger.get_next_time(base_time)
sched_tasks.append(ScheduledTask(scheduled_time, task))
suspend_available = False
if config['enable_suspend']:
if suspend_hook.attachments:
suspend_available = True
log.info("Session suspension enabled.")
else:
log.warning("'enable_suspend' set to true, but no suspend hooks are attached. Add"
" a plugin that provides a suspend hook.")
# Let our `start_tasks` call continue
_update_thread_init_done.set()
# Order by next first
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
while True:
now = datetime.now(tz.tzutc())
if sched_tasks[0].scheduled_for <= now:
# Scheduled time has passed, run the task
log.info(F"Running task {sched_tasks[0].task.interface_call}...")
# Should we be catching exceptions for this?
sched_tasks[0].task.interface_call.call()
# Reschedule and sort
sched_tasks[0].scheduled_for = sched_tasks[0].task.trigger.get_next_time(now)
log.info(F"Done. Rescheduling task for {sched_tasks[0].scheduled_for}.")
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
else:
time_to_wait = sched_tasks[0].scheduled_for - now
if suspend_available:
pass
else:
_stop_event.wait(max(time_to_wait.total_seconds(), MIN_DELAY))
if _stop_event.is_set():
log.warning("Tasks thread stopping...")
_stop_event.clear()
break
_update_thread_init_done.clear()
# Right, so we have our task list, and calls have been resolved. Assume that any waiting period
# for control has already been handled.
# Eventually also need a system to wait for time to stabilise (hook?)
# What do we do when task list is empty? Return directly?
# That list then gets sorted by that time, and the main loop simply waits for the next scheduled
# time to pass before calling the task (on a different thread). The scheduled_for time gets updated,
# the scheduled_tasks list gets sorted again. The loop then looks at the next task, probably with
# some log if "scheduled_for" is too far in the past, but run the task anyway.
# Lowpower handling can then be in the section of the cycle where we'd otherwise be doing a delay.
# Have a minimum sleep delay (10ms or something), so we can have a task only run if it's past
# the scheduled_for, and then we can happily use that time to get the _next_ trigger time.
# Start our scheduler thread to fire off tasks, including the initial grace period checks
# Due to our task constraints, our scheduler can be somewhat simplified. We know that task
# triggers won't change after init, and they will be infrequent enough that we can just
# make a new thread for each task.
# Maybe have "cycles", where we determine what task is going to occur next, and when.
# A main dispatch loop then pretty much delays until that point, or triggers the low power
# stuff somehow.