supervisor/hassio/scheduler.py
Pascal Vizeli 1c49351e66
Refactory code / object handling (#289)
* Refactory code / object handling

* Next step

* fix lint

* Step 2

* Cleanup API code

* cleanup addons code

* cleanup data handling

* Cleanup addons data handling

* Cleanup docker api

* clean docker api p2

* next cleanup round

* cleanup start on snapshots

* update format strings

* fix setup

* fix lint

* fix lint

* fix lint

* fix tox

* Fix wrong import of datetime module

* Fix bug with attributes

* fix extraction

* Update core

* Update logs

* Expand scheduler

* add support for time interval objects

* next updates on tasks

* Fix some things

* Cleanup code / supervisor

* fix lint

* Fix some code styles

* rename stuff

* cleanup api call reload

* fix lock replacment

* fix lint

* fix lint

* fix bug

* fix wrong config links

* fix bugs

* fix bug

* Update version on startup

* Fix some bugs

* fix bug

* Fix snapshot

* Add wait boot options

* fix lint

* fix default config

* fix snapshot

* fix snapshot

* load snapshots on startup

* add log message at the end

* Some cleanups

* fix bug

* add logger

* add logger for supervisor update

* Add more logger
2018-01-02 21:21:29 +01:00

76 lines
2.1 KiB
Python

"""Schedule for HassIO."""
import logging
from datetime import date, datetime, time, timedelta
_LOGGER = logging.getLogger(__name__)
INTERVAL = 'interval'
REPEAT = 'repeat'
CALL = 'callback'
TASK = 'task'
class Scheduler(object):
"""Schedule task inside HassIO."""
def __init__(self, loop):
"""Initialize task schedule."""
self.loop = loop
self._data = {}
self.suspend = False
def register_task(self, coro_callback, interval, repeat=True):
"""Schedule a coroutine.
The coroutien need to be a callback without arguments.
"""
task_id = hash(coro_callback)
# generate data
opts = {
CALL: coro_callback,
INTERVAL: interval,
REPEAT: repeat,
}
# schedule task
self._data[task_id] = opts
self._schedule_task(interval, task_id)
return task_id
def _run_task(self, task_id):
"""Run a scheduled task."""
data = self._data[task_id]
if not self.suspend:
self.loop.create_task(data[CALL]())
if data[REPEAT]:
self._schedule_task(data[INTERVAL], task_id)
else:
self._data.pop(task_id)
def _schedule_task(self, interval, task_id):
"""Schedule a task on loop."""
if isinstance(interval, (int, float)):
job = self.loop.call_later(interval, self._run_task, task_id)
elif isinstance(interval, time):
today = datetime.combine(date.today(), interval)
tomorrow = datetime.combine(
date.today() + timedelta(days=1), interval)
# check if we run it today or next day
if today > datetime.today():
calc = today
else:
calc = tomorrow
job = self.loop.call_at(calc.timestamp(), self._run_task, task_id)
else:
_LOGGER.fatal("Unknow interval %s (type: %s) for scheduler %s",
interval, type(interval), task_id)
# Store job
self._data[task_id][TASK] = job