Renamed AddCooldown to Throttle and added bypass functionality

This commit is contained in:
Paulus Schoutsen 2014-12-04 21:06:45 -08:00
parent 31b9f65513
commit 48089b01ab
7 changed files with 91 additions and 65 deletions

View File

@ -192,32 +192,65 @@ class TestUtil(unittest.TestCase):
set1.update([1, 2], [5, 6])
self.assertEqual([2, 3, 1, 5, 6], set1)
def test_add_cooldown(self):
def test_throttle(self):
""" Test the add cooldown decorator. """
calls = []
calls1 = []
@util.AddCooldown(timedelta(milliseconds=500))
def test_cooldown():
calls.append(1)
@util.Throttle(timedelta(milliseconds=500))
def test_throttle1():
calls1.append(1)
self.assertEqual(0, len(calls))
calls2 = []
test_cooldown()
@util.Throttle(
timedelta(milliseconds=500), timedelta(milliseconds=250))
def test_throttle2():
calls2.append(1)
self.assertEqual(1, len(calls))
# Ensure init is ok
self.assertEqual(0, len(calls1))
self.assertEqual(0, len(calls2))
test_cooldown()
# Call first time and ensure methods got called
test_throttle1()
test_throttle2()
self.assertEqual(1, len(calls))
self.assertEqual(1, len(calls1))
self.assertEqual(1, len(calls2))
# Call second time. Methods should not get called
test_throttle1()
test_throttle2()
self.assertEqual(1, len(calls1))
self.assertEqual(1, len(calls2))
# Call again, overriding throttle, only first one should fire
test_throttle1(no_throttle=True)
test_throttle2(no_throttle=True)
self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2))
# Sleep past the no throttle interval for throttle2
time.sleep(.3)
test_cooldown()
test_throttle1()
test_throttle2()
self.assertEqual(1, len(calls))
self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2))
time.sleep(.2)
test_throttle1(no_throttle=True)
test_throttle2(no_throttle=True)
test_cooldown()
self.assertEqual(3, len(calls1))
self.assertEqual(2, len(calls2))
self.assertEqual(2, len(calls))
time.sleep(.5)
test_throttle1()
test_throttle2()
self.assertEqual(4, len(calls1))
self.assertEqual(3, len(calls2))

View File

@ -87,7 +87,7 @@ class LuciDeviceScanner(object):
return
return self.mac2name.get(device, None)
@util.AddCooldown(MIN_TIME_BETWEEN_SCANS)
@util.Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
""" Ensures the information from the Luci router is up to date.
Returns boolean if scanning successful. """

View File

@ -82,7 +82,7 @@ class NetgearDeviceScanner(object):
else:
return None
@util.AddCooldown(MIN_TIME_BETWEEN_SCANS)
@util.Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
""" Retrieves latest information from the Netgear router.
Returns boolean if scanning successful. """

View File

@ -78,7 +78,7 @@ class TomatoDeviceScanner(object):
else:
return filter_named[0]
@util.AddCooldown(MIN_TIME_BETWEEN_SCANS)
@util.Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_tomato_info(self):
""" Ensures the information from the Tomato router is up to date.
Returns boolean if scanning successful. """

View File

@ -1,14 +1,16 @@
""" Support for Hue lights. """
import logging
import socket
from datetime import datetime, timedelta
from datetime import timedelta
import homeassistant as ha
import homeassistant.util as util
from homeassistant.components import ToggleDevice, ATTR_FRIENDLY_NAME
from homeassistant.components.light import (
ATTR_BRIGHTNESS, ATTR_XY_COLOR, ATTR_TRANSITION)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(seconds=1)
PHUE_CONFIG_FILE = "phue.conf"
@ -37,25 +39,9 @@ def get_lights(hass, config):
lights = {}
def update_lights(force_reload=False):
""" Updates the light states. """
now = datetime.now()
try:
time_scans = now - update_lights.last_updated
# force_reload == True, return if updated in last second
# force_reload == False, return if last update was less then
# MIN_TIME_BETWEEN_SCANS ago
if force_reload and time_scans.seconds < 1 or \
not force_reload and time_scans < MIN_TIME_BETWEEN_SCANS:
return
except AttributeError:
# First time we run last_updated is not set, continue as usual
pass
update_lights.last_updated = now
@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
def update_lights():
""" Updates the Hue light objects with latest info from the bridge. """
try:
api = bridge.get_api()
except socket.error:
@ -142,4 +128,4 @@ class HueLight(ToggleDevice):
def update(self):
""" Synchronize state with bridge. """
self.update_lights(True)
self.update_lights(no_throttle=True)

View File

@ -4,7 +4,7 @@ homeassistant.components.switch
Component to interface with various switches that can be controlled remotely.
"""
import logging
from datetime import datetime, timedelta
from datetime import timedelta
import homeassistant as ha
import homeassistant.util as util
@ -96,21 +96,16 @@ def setup(hass, config):
ent_to_switch[entity_id] = switch
# pylint: disable=unused-argument
def update_states(time, force_reload=False):
@util.Throttle(MIN_TIME_BETWEEN_SCANS)
def update_states(now):
""" Update states of all switches. """
# First time this method gets called, force_reload should be True
if force_reload or \
datetime.now() - update_states.last_updated > \
MIN_TIME_BETWEEN_SCANS:
logger.info("Updating switch states")
logger.info("Updating switch states")
update_states.last_updated = datetime.now()
for switch in switches:
switch.update_ha_state(hass)
for switch in switches:
switch.update_ha_state(hass)
update_states(None, True)
update_states(None)
def handle_switch_service(service):
""" Handles calls to the switch services. """

View File

@ -8,7 +8,7 @@ import collections
from itertools import chain
import threading
import queue
import datetime
from datetime import datetime
import re
import enum
import socket
@ -52,7 +52,7 @@ def str_to_datetime(dt_str):
@rtype: datetime
"""
try:
return datetime.datetime.strptime(dt_str, DATE_STR_FORMAT)
return datetime.strptime(dt_str, DATE_STR_FORMAT)
except ValueError: # If dt_str did not match our format
return None
@ -68,7 +68,7 @@ def repr_helper(inp):
return ", ".join(
repr_helper(key)+"="+repr_helper(item) for key, item
in inp.items())
elif isinstance(inp, datetime.datetime):
elif isinstance(inp, datetime):
return datetime_to_str(inp)
else:
return str(inp)
@ -274,36 +274,48 @@ def validate_config(config, items, logger):
return not errors_found
class AddCooldown(object):
class Throttle(object):
"""
A method decorator to add a cooldown to a method.
A method decorator to add a cooldown to a method to prevent it from being
called more then 1 time within the timedelta interval `min_time` after it
returned its result.
If you set a cooldown of 5 seconds. Then if you call a method twice the
underlaying method will not be called if the second call was within
5 seconds of the first. None will be returned instead.
Calling a method a second time during the interval will return None.
Makes a last_call attribute available on the wrapped method.
Pass keyword argument `no_throttle=True` to the wrapped method to make
the call not throttled.
Decorator takes in an optional second timedelta interval to throttle the
'no_throttle' calls.
Adds a datetime attribute `last_call` to the method.
"""
# pylint: disable=too-few-public-methods
def __init__(self, min_time):
def __init__(self, min_time, limit_no_throttle=None):
self.min_time = min_time
self.limit_no_throttle = limit_no_throttle
def __call__(self, method):
lock = threading.Lock()
if self.limit_no_throttle is not None:
method = Throttle(self.limit_no_throttle)(method)
@wraps(method)
def wrapper(*args, **kwargs):
"""
Wrapper that allows wrapped to be called only once per min_time.
"""
with lock:
now = datetime.datetime.now()
last_call = wrapper.last_call
# Check if method is never called or no_throttle is given
force = last_call is None or kwargs.pop('no_throttle', False)
if force or datetime.now() - last_call > self.min_time:
if last_call is None or now - last_call > self.min_time:
result = method(*args, **kwargs)
wrapper.last_call = now
wrapper.last_call = datetime.now()
return result
else:
return None
@ -418,7 +430,7 @@ def _threadpool_worker(work_queue, current_jobs, job_handler, quit_task):
return
# Add to current running jobs
job_log = (datetime.datetime.now(), job)
job_log = (datetime.now(), job)
current_jobs.append(job_log)
# Do the job