Task and session initial structure

master
Tom Wilson 6 years ago
parent 9a42121007
commit 70d266f91c

@ -13,6 +13,7 @@ from configspec import *
from . import plugin
from . import control
from . import tasks
log = logging.getLogger("shepherd.agent")
@ -130,7 +131,7 @@ class Agent():
if self.control_enabled:
control.start_control(self.core_config["control"], self.root_dir(),
core_update_state, plugin_update_states)
core_update_state, plugin_update_states)
# Need somewhere to eventually pass in the hooks Tasks will need for the lowpower stuff,
# probably just another init_plugins arg.
@ -138,6 +139,15 @@ class Agent():
# namespace stuff into it's own plugin interface, as it's using a lot of the same
# mechanisms, but we're having to pass it all around individually.
# TODO Collect plugin tasks
task_session = tasks.init_tasks(self.core_config['session'], self.root_dir(),
[], self.applied_config, self.interface_functions)
# TODO Any time stabilisation or waiting for Control
tasks.start_tasks(self.core_interface, task_session)
# tasks.init_tasks(self.core_config) # seperate tasks.start?
# plugin.start() # Run the plugin `.run` hooks in seperate threads

@ -645,24 +645,24 @@ def load_plugin_interface(plugin_name, interface, module=None):
interface.register_confspec(confspec_list[0][1])
if interface._confspec is None:
interface._confspec = ConfigSpecification()
interface._confspec = ConfigSpecification()
interface._update_state.set_confspec(interface.confspec)
if module is not None:
# Scan module for load markers left by decorators and pass them over to register methods
for key, attr in module.__dict__.items():
if hasattr(attr, "_shepherd_load_marker"):
if isinstance(attr._shepherd_load_marker, FunctionMarker):
interface.register_function(attr, **attr._shepherd_load_marker._asdict())
elif isinstance(attr._shepherd_load_marker, AttachmentMarker):
interface.register_attachment(attr, **attr._shepherd_load_marker._asdict())
elif isinstance(attr._shepherd_load_marker, HookMarker):
# Hooks are a little different in that we replace the attr with the hook
newhook = interface.register_hook(**attr._shepherd_load_marker._asdict())
setattr(module, key, newhook)
elif isinstance(attr._shepherd_load_marker, ClassMarker):
interface.register_class(attr)
# Scan module for load markers left by decorators and pass them over to register methods
for key, attr in module.__dict__.items():
if hasattr(attr, "_shepherd_load_marker"):
if isinstance(attr._shepherd_load_marker, FunctionMarker):
interface.register_function(attr, **attr._shepherd_load_marker._asdict())
elif isinstance(attr._shepherd_load_marker, AttachmentMarker):
interface.register_attachment(attr, **attr._shepherd_load_marker._asdict())
elif isinstance(attr._shepherd_load_marker, HookMarker):
# Hooks are a little different in that we replace the attr with the hook
newhook = interface.register_hook(**attr._shepherd_load_marker._asdict())
setattr(module, key, newhook)
elif isinstance(attr._shepherd_load_marker, ClassMarker):
interface.register_class(attr)
if interface._plugin_class is not None:
# Scan plugin class for marked methods

@ -1,6 +1,15 @@
"""
Implements both the main task scheduler for Shepherd and the Session system for restoring
state between power cycles.
"""
from abc import ABC, abstractmethod
import json
from pathlib import Path
import logging
from collections import namedtuple
import threading
from datetime import datetime, timedelta
from dateutil import tz
import pytz
@ -9,6 +18,31 @@ from apscheduler.triggers.cron import CronTrigger as APCronTrigger
from configspec import *
import preserve
from .util import HoldLock
log = logging.getLogger("shepherd.agent.tasks")
def register(core_interface):
"""
Register the session confspec and hooks on the core interface passed in - `start_tasks` later
assumes that these hooks are present.
"""
confspec = ConfigSpecification()
confspec.add_spec("resume_delay", IntSpec(helptext="Initial estimate of the time taken to"
" resume a session, in seconds"), default=180)
confspec.add_spec("enable_suspend", BoolSpec(helptext="Enables suspension of the agent session"
" in between tasks"), default=True)
confspec.add_spec("min_suspend_time", IntSpec(helptext="Smallest wait period before the next"
" scheduled task that the agent will decide to"
" suspend, in seconds"), default=300)
core_interface.confspec.add_spec("session", confspec, optional=True, default={})
# `resume_time` is a DateTime indicating when the session should resume - it already has the
# resume delay applied. Hook should return True on success
core_interface.register_hook("session_suspend", ["resume_time"])
class TaskTrigger(ABC):
"""Abstract trigger class"""
@ -16,7 +50,7 @@ class TaskTrigger(ABC):
def next_time(self, base_time):
"""
Return a time indicating the next trigger time after base_time. Return None if no more
trigger events.
trigger events. Should be a DateTime object with the `tz.tzutc()` timezone.
"""
@ -89,23 +123,217 @@ TriggerSpec.add_specs({
# pass
@preserve.preservable
class Task():
def __init__(self, trigger, interface_call):
pass
def __init__(self, trigger, interface_call, use_session=True):
"""
Define a new task. If `use_session` is true, will only add the task when a new session is
created, otherwise it will be restored from the old session. Suspended sessions will also
be resumed in order to perform tasks where `use_session` is true.
If `use_session` is false, the task will be added on every init, and will not be saved
when a session is suspended.
"""
self.trigger = trigger
self.interface_call = interface_call
self.use_session = use_session
# InterfaceCall already handles the callables and args for us, we just need to preserve
# them. Trigger is going to be multiple formats, but the most common will be Cron style.
def init_tasks(core_config, applied_config, interface_functions):
pass
# Check if we have a session to load
# Load the session - this includes resolving interfacecalls
@preserve.preservable
class Session():
"""
Container class to hold session details
"""
def __init__(self, config, tasks, resume_delay, resume_time=None):
"""
Create a session instance.
`config` is the applied config for the current session, used to detect when it changes
`tasks` is the list of tasks saved from the old session
`resume_delay` is the estimated time taken from `resume_time` when deciding to resume the
session, to compensate for the time taken to resume
`resume_time` is the intended resume time (slightly before the next scheduled task)
"""
self.config = config
self.tasks = tasks
self.resume_time = resume_time
self.resume_delay = resume_delay
@classmethod
def load(cls, root_dir, applied_config):
"""
Load a Session instance from the shepherd.session file. Returns None
if the config has changed or no file is found.
"""
session_file = Path(root_dir, 'shepherd.session')
if session_file.exists():
session = preserve.restore(json.loads(session_file.read_text()))
if session.config == applied_config:
return session
log.info("Config has changed since last session")
else:
log.info("No existing session file found")
return None
def save(self, root_dir):
"""
Peel out non-session tasks, then save this Session instance to the
shepherd.session file.
"""
all_tasks = self.tasks
self.tasks = [task for task in all_tasks if task.use_session]
session_file = Path(root_dir, 'shepherd.session')
session_file.write_text(json.dumps(preserve.preserve(self)))
self.tasks = all_tasks
def init_tasks(config, root_dir, init_tasklist, applied_config, interface_functions):
"""
Generate the list of tasks to be run. Attempt to restore existing session, if it is present.
Use the supplied list of init tasks, ignoring tasks marked with 'use_session' unless we're
starting a new session.
Resolves task interface calls, and returns the task session.
"""
session = Session.load(root_dir, applied_config)
if session is None:
log.info("Starting new session")
session = Session(applied_config, init_tasklist, config['resume_delay'])
else:
# Add non-session tasks
session.tasks.extend([task for task in init_tasklist if not task.use_session])
# Resolve task interface calls
for task in session.tasks:
task.interface_call.resolve(interface_functions)
# Return a list of tasks
return session
def start_tasks(task_list):
pass
ScheduledTask = namedtuple("ScheduledTask", ["scheduled_for", "task"])
_update_thread_init_done = threading.Event()
_stop_event = threading.Event()
def stop():
_stop_event.set()
log.info("Tasks thread stop requested.")
def start_tasks(core_interface, session):
"""
Initialise the Tasks session and start the Tasks update thread.
"""
# Clear for easier testing
_stop_event.clear()
_update_thread_init_done.clear()
config = core_interface.config['session']
suspend_hook = core_interface.hooks.session_suspend
tasks_thread = threading.Thread(target=_tasks_update_loop,
args=(config, suspend_hook, session))
tasks_thread.start()
# Wait for init so our log makes sense
_update_thread_init_done.wait()
return tasks_thread
MIN_DELAY = 0.01 # Minimum time (in seconds) the task loop will sleep for.
def _tasks_update_loop(config, suspend_hook, session):
sched_tasks = []
base_time = session.resume_time
now = datetime.now(tz.tzutc())
# If it's a new session, only schedule tasks from now.
if base_time is None:
base_time = now
# Maximum permitted snooze is currently hardcoded to 5 minutes. This means that if we
# resume the session more than 5 minutes later than we'd intended (`session.resume_time`),
# only run tasks that would have been scheduled for the last 5 minutes.
max_snooze_time = timedelta(minutes=5)
if base_time < now-max_snooze_time:
log.warning(F"Session resumed more than maximum snooze time ({max_snooze_time}) after"
F" intended session resume time ({base_time}). Only scheduling tasks after"
F" {now-max_snooze_time}, so may have missed some scheduled tasks.")
base_time = now-max_snooze_time
if len(session.tasks) == 0:
log.info("No tasks scheduled. Stopping Tasks thread.")
_update_thread_init_done.set()
return
for task in session.tasks:
scheduled_time = task.trigger.get_next_time(base_time)
sched_tasks.append(ScheduledTask(scheduled_time, task))
suspend_available = False
if config['enable_suspend'] and suspend_hook.attachments:
suspend_available = True
# Let our `start_tasks` call continue
_update_thread_init_done.set()
# Order by next first
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
while True:
now = datetime.now(tz.tzutc())
if sched_tasks[0].scheduled_for <= now:
# Scheduled time has passed, run the task
log.info(F"Running task {sched_tasks[0].task.interface_call}...")
sched_tasks[0].task.interface_call.call()
# Reschedule and sort
sched_tasks[0].scheduled_for = sched_tasks[0].task.trigger.get_next_time(now)
log.info(F"Done. Rescheduling task for {sched_tasks[0].scheduled_for}.")
sched_tasks.sort(key=lambda schedtask: schedtask.scheduled_for)
else:
time_to_wait = sched_tasks[0].scheduled_for - now
if suspend_available:
pass
else:
_stop_event.wait(max(time_to_wait.total_seconds(), MIN_DELAY))
if _stop_event.is_set():
log.warning("Tasks thread stopping...")
_stop_event.clear()
break
_update_thread_init_done.clear()
# Right, so we have our task list, and calls have been resolved. Assume that any waiting period
# for control has already been handled.
# Eventually also need a system to wait for time to stabilise (hook?)
# What do we do when task list is empty? Return directly?
# That list then gets sorted by that time, and the main loop simply waits for the next scheduled
# time to pass before calling the task (on a different thread). The scheduled_for time gets updated,
# the scheduled_tasks list gets sorted again. The loop then looks at the next task, probably with
# some log if "scheduled_for" is too far in the past, but run the task anyway.
# Lowpower handling can then be in the section of the cycle where we'd otherwise be doing a delay.
# Have a minimum sleep delay (10ms or something), so we can have a task only run if it's past
# the scheduled_for, and then we can happily use that time to get the _next_ trigger time.
# Start our scheduler thread to fire off tasks, including the initial grace period checks
# Due to our task constraints, our scheduler can be somewhat simplified. We know that task
# triggers won't change after init, and they will be infrequent enough that we can just
@ -114,5 +342,3 @@ def start_tasks(task_list):
# Maybe have "cycles", where we determine what task is going to occur next, and when.
# A main dispatch loop then pretty much delays until that point, or triggers the low power
# stuff somehow.
# Does the low power stuff occur here? Or somewhere else?

Loading…
Cancel
Save