You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
shepherd-agent/shepherd/agent/tasks.py

333 lines
12 KiB

"""
Implements both the main task scheduler for Shepherd and the Session system for restoring
state between power cycles.
"""
from abc import ABC, abstractmethod
import json
from pathlib import Path
import logging
from collections import namedtuple
import threading
from datetime import datetime, timedelta
from dateutil import tz
import pytz
from apscheduler.triggers.cron import CronTrigger as APCronTrigger
from configspec import *
import preserve
# from .util import HoldLock
log = logging.getLogger("shepherd.agent.tasks")
def register_on(core_interface):
"""
Register the session confspec and hooks on the core interface passed in - `start_tasks` later
assumes that these hooks are present.
"""
confspec = ConfigSpecification()
confspec.add_spec("resume_delay", IntSpec(helptext="Initial estimate of the time taken to"
" resume a session, in seconds"), default=180)
confspec.add_spec("enable_suspend", BoolSpec(helptext="Enables suspension of the agent session"
" in between tasks"), default=True)
confspec.add_spec("min_suspend_time", IntSpec(helptext="Smallest wait period before the next"
" scheduled task that the agent will decide to"
" suspend, in seconds"), default=300)
core_interface.confspec.add_spec("session", confspec, optional=True, default={})
# `resume_time` is a DateTime indicating when the session should resume - it already has the
# resume delay applied. Hook should return True on success
core_interface.register_hook("session_suspend", ["resume_time"])
class TaskTrigger(ABC):
"""Abstract trigger class"""
@abstractmethod
def next_time(self, base_time):
"""
Return a time indicating the next trigger time after base_time. Return None if no more
trigger events. Should be a DateTime object with the `tz.tzutc()` timezone.
"""
@preserve.preservable(exclude_attrs=['ap_trigger'])
class CronTrigger(TaskTrigger):
"""
Interprets Cron strings using a wrapper around the APScheduler CronTrigger (and so function
is similar). Values left as default or supplied as None are set to a wildcard, unless it is
a smaller unit than those supplied - where it instead gets set to it's minimum (so setting
`hour` to 3 will set `minute` and `second` to 0).
The trigger format will be matched against
The timezone used is always the local system timezone.
Details available at https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html
"""
def __init__(self, month=None, day=None, day_of_week=None, hour=None,
minute=None, second=None):
self.month = month
self.day = day
self.day_of_week = day_of_week
self.hour = hour
self.minute = minute
self.second = second
self.__restore_init__()
def __restore_init__(self):
# Default timezone is the one from tzlocal
self.ap_trigger = APCronTrigger(month=self.month, day=self.day,
day_of_week=self.day_of_week,
hour=self.hour, minute=self.minute,
second=self.second)
def next_time(self, base_time):
"""
Return a time indicating the next trigger time after base_time. Return None if no more
trigger events.
"""
# Convert base_time to UTC with dateutil, then to pytz which APScheduler requires.
utc_base_time = base_time.astimezone(tz.tzutc()).astimezone(pytz.utc)
fire_time = self.ap_trigger.get_next_fire_time(None, utc_base_time)
# Convert back to UTC, as ap_trigger returns a value with local timezone
# Use DateUtil, as it doesn't add other crap to tzinfo
return fire_time.astimezone(pytz.utc).astimezone(tz.tzutc())
CronTriggerSpec = ConfigSpecification()
CronTriggerSpec.add_specs({
'month': StringSpec(helptext="Month in year, 1-12"),
'day': StringSpec(helptext="Day of month, 1-32"),
'day_of_week': StringSpec(helptext="Day of week, 0-6 or mon,tue,wed,thu,fri,sat,sun"),
'hour': StringSpec(helptext="Hour in day, 0-23"),
'minute': StringSpec(helptext="Minute in hour, 0-59"),
'second': StringSpec(helptext="Second in minute, 0-59"),
}, optional=True)
# class IntervalTrigger(TaskTrigger):
"""
Triggers every x period starting from when it was first created (carries over lowpower)
"""
# pass
# class SingleTrigger(TaskTrigger):
"""
Either pass a whole datetime instance, or a delta like a period that gets added to current.
"""
# pass
@preserve.preservable
class Task():
def __init__(self, trigger, interface_call, use_session=True):
"""
Define a new task. If `use_session` is true, will only add the task when a new session is
created, otherwise it will be restored from the old session. Suspended sessions will also
be resumed in order to perform tasks where `use_session` is true.
If `use_session` is false, the task will be added on every init, and will not be saved
when a session is suspended.
"""
self.trigger = trigger
self.interface_call = interface_call
self.use_session = use_session
# InterfaceCall already handles the callables and args for us, we just need to preserve
# them. Trigger is going to be multiple formats, but the most common will be Cron style.
@preserve.preservable
class Session():
"""
Container class to hold session details
"""
def __init__(self, config, tasks, resume_delay, resume_time=None):
"""
Create a session instance.
`config` is the applied config for the current session, used to detect when it changes
`tasks` is the list of tasks saved from the old session
`resume_delay` is the estimated time taken from `resume_time` when deciding to resume the
session, to compensate for the time taken to resume
`resume_time` is the intended resume time (slightly before the next scheduled task)
"""
self.config = config
self.tasks = tasks
self.resume_time = resume_time
self.resume_delay = resume_delay
@classmethod
def load(cls, root_dir, applied_config):
"""
Load a Session instance from the shepherd.session file. Returns None
if the config has changed or no file is found.
"""
session_file = Path(root_dir, 'shepherd.session')
if session_file.exists():
session = preserve.restore(json.loads(session_file.read_text()))
if session.config == applied_config:
return session
log.info("Config has changed since last session")
else:
log.info("No existing session file found")
return None
def save(self, root_dir):
"""
Peel out non-session tasks, then save this Session instance to the
shepherd.session file.
"""
all_tasks = self.tasks
self.tasks = [task for task in all_tasks if task.use_session]
session_file = Path(root_dir, 'shepherd.session')
session_file.write_text(json.dumps(preserve.preserve(self)))
self.tasks = all_tasks
def init_tasks(config, root_dir, init_tasklist, applied_config, interface_functions):
"""
Generate the list of tasks to be run. Attempt to restore existing session, if it is present.
Use the supplied list of init tasks, ignoring tasks marked with 'use_session' unless we're
starting a new session.
Resolves task interface calls, and returns the task session.
"""
session = Session.load(root_dir, applied_config)
if session is None:
log.info("Starting new session")
session = Session(applied_config, init_tasklist, config['resume_delay'])
else:
# Add non-session tasks
session.tasks.extend([task for task in init_tasklist if not task.use_session])
# Resolve task interface calls
for task in session.tasks:
task.interface_call.resolve(interface_functions)
return session
ScheduledTask = namedtuple("ScheduledTask", ["scheduled_for", "task"])
_update_thread_init_done = threading.Event()
_stop_event = threading.Event()
def stop():
_stop_event.set()
log.info("Tasks thread stop requested.")
def start_tasks(core_interface, session):
"""
Initialise the Tasks session and start the Tasks update thread.
"""
# Clear for easier testing
_stop_event.clear()
_update_thread_init_done.clear()
config = core_interface.config['session']
suspend_hook = core_interface.hooks.session_suspend
tasks_thread = threading.Thread(target=_tasks_update_loop,
args=(config, suspend_hook, session))
tasks_thread.start()
# Wait for init so our log makes sense
_update_thread_init_done.wait()
return tasks_thread
MIN_DELAY = 0.01 # Minimum time (in seconds) the task loop will sleep for.
def _tasks_update_loop(config, suspend_hook, session):
sched_tasks = []
# When resuming, schedule tasks from the desired resume time, even if it's in the past
base_time = session.resume_time
now = datetime.now(tz.tzutc())
# If it's a new session, only schedule tasks from now.
if base_time is None:
base_time = now
# Maximum permitted snooze is currently hardcoded to 5 minutes. This means that if we
# resume the session more than 5 minutes later than we'd intended (`session.resume_time`),
# only run tasks that would have been scheduled for the last 5 minutes.
max_snooze_time = timedelta(minutes=5)
if base_time < now-max_snooze_time:
log.warning(F"Session resumed more than maximum snooze time ({max_snooze_time}) after"
F" intended session resume time ({base_time}). Only scheduling tasks after"
F" {now-max_snooze_time}, so may have missed some scheduled tasks.")
base_time = now-max_snooze_time
if len(session.tasks) == 0:
log.info("No tasks scheduled. Stopping Tasks thread.")
_update_thread_init_done.set()
return
for task in session.tasks:
scheduled_time = task.trigger.get_next_time(base_time)
sched_tasks.append(ScheduledTask(scheduled_time, task))
suspend_available = False
if config['enable_suspend']:
if suspend_hook.attachments:
suspend_available = True
log.info("Session suspension enabled.")
else:
log.warning("'enable_suspend' set to true, but no suspend hooks are attached. Add"
" a plugin that provides a suspend hook.")
# Let our `start_tasks` call continue
_update_thread_init_done.set()
# Order by next first
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
while True:
now = datetime.now(tz.tzutc())
if sched_tasks[0].scheduled_for <= now:
# Scheduled time has passed, run the task
log.info(F"Running task {sched_tasks[0].task.interface_call}...")
# Should we be catching exceptions for this?
sched_tasks[0].task.interface_call.call()
# Reschedule and sort
sched_tasks[0].scheduled_for = sched_tasks[0].task.trigger.get_next_time(now)
log.info(F"Done. Rescheduling task for {sched_tasks[0].scheduled_for}.")
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
else:
time_to_wait = sched_tasks[0].scheduled_for - now
if suspend_available:
pass
else:
_stop_event.wait(max(time_to_wait.total_seconds(), MIN_DELAY))
if _stop_event.is_set():
log.warning("Tasks thread stopping...")
_stop_event.clear()
break
_update_thread_init_done.clear()
# TODO Handle case when tasks return None as next trigger time, and when no triggers are left
# TODO Add maximum suspend period
# TODO Add "snooze" task checking even on new session, to catch tasks we miss if we restart
# due to new config