mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-04-20 19:27:16 +00:00

* Refactory code / object handling * Next step * fix lint * Step 2 * Cleanup API code * cleanup addons code * cleanup data handling * Cleanup addons data handling * Cleanup docker api * clean docker api p2 * next cleanup round * cleanup start on snapshots * update format strings * fix setup * fix lint * fix lint * fix lint * fix tox * Fix wrong import of datetime module * Fix bug with attributes * fix extraction * Update core * Update logs * Expand scheduler * add support for time interval objects * next updates on tasks * Fix some things * Cleanup code / supervisor * fix lint * Fix some code styles * rename stuff * cleanup api call reload * fix lock replacment * fix lint * fix lint * fix bug * fix wrong config links * fix bugs * fix bug * Update version on startup * Fix some bugs * fix bug * Fix snapshot * Add wait boot options * fix lint * fix default config * fix snapshot * fix snapshot * load snapshots on startup * add log message at the end * Some cleanups * fix bug * add logger * add logger for supervisor update * Add more logger
76 lines
2.1 KiB
Python
76 lines
2.1 KiB
Python
"""Schedule for HassIO."""
|
|
import logging
|
|
from datetime import date, datetime, time, timedelta
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
INTERVAL = 'interval'
|
|
REPEAT = 'repeat'
|
|
CALL = 'callback'
|
|
TASK = 'task'
|
|
|
|
|
|
class Scheduler(object):
|
|
"""Schedule task inside HassIO."""
|
|
|
|
def __init__(self, loop):
|
|
"""Initialize task schedule."""
|
|
self.loop = loop
|
|
self._data = {}
|
|
self.suspend = False
|
|
|
|
def register_task(self, coro_callback, interval, repeat=True):
|
|
"""Schedule a coroutine.
|
|
|
|
The coroutien need to be a callback without arguments.
|
|
"""
|
|
task_id = hash(coro_callback)
|
|
|
|
# generate data
|
|
opts = {
|
|
CALL: coro_callback,
|
|
INTERVAL: interval,
|
|
REPEAT: repeat,
|
|
}
|
|
|
|
# schedule task
|
|
self._data[task_id] = opts
|
|
self._schedule_task(interval, task_id)
|
|
|
|
return task_id
|
|
|
|
def _run_task(self, task_id):
|
|
"""Run a scheduled task."""
|
|
data = self._data[task_id]
|
|
|
|
if not self.suspend:
|
|
self.loop.create_task(data[CALL]())
|
|
|
|
if data[REPEAT]:
|
|
self._schedule_task(data[INTERVAL], task_id)
|
|
else:
|
|
self._data.pop(task_id)
|
|
|
|
def _schedule_task(self, interval, task_id):
|
|
"""Schedule a task on loop."""
|
|
if isinstance(interval, (int, float)):
|
|
job = self.loop.call_later(interval, self._run_task, task_id)
|
|
elif isinstance(interval, time):
|
|
today = datetime.combine(date.today(), interval)
|
|
tomorrow = datetime.combine(
|
|
date.today() + timedelta(days=1), interval)
|
|
|
|
# check if we run it today or next day
|
|
if today > datetime.today():
|
|
calc = today
|
|
else:
|
|
calc = tomorrow
|
|
|
|
job = self.loop.call_at(calc.timestamp(), self._run_task, task_id)
|
|
else:
|
|
_LOGGER.fatal("Unknow interval %s (type: %s) for scheduler %s",
|
|
interval, type(interval), task_id)
|
|
|
|
# Store job
|
|
self._data[task_id][TASK] = job
|