Submit tasks to a scheduler, allowing them to be run at a future time. Tasks can be submitted in any order, and non-running tasks can be cancelled.
I used a min-heap to prioritize the next task, and a threading.Condition to communicate between
- a "watcher" thread that sleeps until the next task needs to be run, or its sleep time needs to be shortened (new "sooner" task added)
- the main thread, which writes to the min-heap
Scheduling is O(log n), cancellation is O(n), and getting the soonest task is O(1).
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap = []
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled {}'.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: {}'.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: {}'.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: {}'.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: {}'.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
Output:
❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
Concurrency is hard, so I'm very interested in hearing folks' thoughts. Thanks!