mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-10-25 11:39:33 +00:00
* Refactory code / object handling * Next step * fix lint * Step 2 * Cleanup API code * cleanup addons code * cleanup data handling * Cleanup addons data handling * Cleanup docker api * clean docker api p2 * next cleanup round * cleanup start on snapshots * update format strings * fix setup * fix lint * fix lint * fix lint * fix tox * Fix wrong import of datetime module * Fix bug with attributes * fix extraction * Update core * Update logs * Expand scheduler * add support for time interval objects * next updates on tasks * Fix some things * Cleanup code / supervisor * fix lint * Fix some code styles * rename stuff * cleanup api call reload * fix lock replacment * fix lint * fix lint * fix bug * fix wrong config links * fix bugs * fix bug * Update version on startup * Fix some bugs * fix bug * Fix snapshot * Add wait boot options * fix lint * fix default config * fix snapshot * fix snapshot * load snapshots on startup * add log message at the end * Some cleanups * fix bug * add logger * add logger for supervisor update * Add more logger
35 lines
988 B
Python
35 lines
988 B
Python
"""Tools file for HassIO."""
|
|
from datetime import datetime
|
|
import re
|
|
|
|
RE_STRING = re.compile(r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))")
|
|
|
|
|
|
def convert_to_ascii(raw):
|
|
"""Convert binary to ascii and remove colors."""
|
|
return RE_STRING.sub("", raw.decode())
|
|
|
|
|
|
class AsyncThrottle(object):
|
|
"""
|
|
Decorator that prevents a function from being called more than once every
|
|
time period.
|
|
"""
|
|
def __init__(self, delta):
|
|
"""Initialize async throttle."""
|
|
self.throttle_period = delta
|
|
self.time_of_last_call = datetime.min
|
|
|
|
def __call__(self, method):
|
|
"""Throttle function"""
|
|
async def wrapper(*args, **kwargs):
|
|
"""Throttle function wrapper"""
|
|
now = datetime.now()
|
|
time_since_last_call = now - self.time_of_last_call
|
|
|
|
if time_since_last_call > self.throttle_period:
|
|
self.time_of_last_call = now
|
|
return await method(*args, **kwargs)
|
|
|
|
return wrapper
|