Switch on/off all lights, and wait for the result (#27078)

* Switch on/off all lights, and wait for the result

Reuses the parallel_updates semaphore.
This is a small crutch which serializes platforms which already do tis
for updates. Platforms which can parallelize everything, this makes it
go faster

* Fix broken unittest

With manual validation, with help from @frenck, we found out that the
assertions are wrong and the test should be failing.

The sequence requested is
OFF
ON

without cancelation, this code should result in:
off,off,off,on,on,on

testable, by adding a `await hass.async_block_till_done()` between the
off and on call.

with cancelation. there should be less off call's so
off,on,on,on

* Adding tests for async_request_call

* Process review feedback

* Switch gather with wait

* 👕 running black
This commit is contained in:
Robbert Müller 2019-10-06 17:23:12 +02:00 committed by Paulus Schoutsen
parent c7c88b2b68
commit 7a156059e9
4 changed files with 113 additions and 12 deletions

View File

@ -292,7 +292,8 @@ async def async_setup(hass, config):
preprocess_turn_on_alternatives(params)
turn_lights_off, off_params = preprocess_turn_off(params)
update_tasks = []
poll_lights = []
change_tasks = []
for light in target_lights:
light.async_set_context(service.context)
@ -305,17 +306,22 @@ async def async_setup(hass, config):
preprocess_turn_on_alternatives(pars)
turn_light_off, off_pars = preprocess_turn_off(pars)
if turn_light_off:
await light.async_turn_off(**off_pars)
task = light.async_request_call(light.async_turn_off(**off_pars))
else:
await light.async_turn_on(**pars)
task = light.async_request_call(light.async_turn_on(**pars))
if not light.should_poll:
continue
change_tasks.append(task)
update_tasks.append(light.async_update_ha_state(True))
if light.should_poll:
poll_lights.append(light)
if update_tasks:
await asyncio.wait(update_tasks)
if change_tasks:
await asyncio.wait(change_tasks)
if poll_lights:
await asyncio.wait(
[light.async_update_ha_state(True) for light in poll_lights]
)
# Listen for light on and light off service calls.
hass.services.async_register(

View File

@ -551,6 +551,19 @@ class Entity:
"""Return the representation."""
return "<Entity {}: {}>".format(self.name, self.state)
# call an requests
async def async_request_call(self, coro):
"""Process request batched."""
if self.parallel_updates:
await self.parallel_updates.acquire()
try:
await coro
finally:
if self.parallel_updates:
self.parallel_updates.release()
class ToggleEntity(Entity):
"""An abstract class for entities that can be turned on and off."""

View File

@ -313,10 +313,10 @@ async def test_signal_repetitions_cancelling(hass, monkeypatch):
await hass.async_block_till_done()
assert protocol.send_command_ack.call_args_list[0][0][1] == "on"
assert protocol.send_command_ack.call_args_list[1][0][1] == "off"
assert protocol.send_command_ack.call_args_list[2][0][1] == "off"
assert protocol.send_command_ack.call_args_list[3][0][1] == "off"
assert protocol.send_command_ack.call_args_list[0][0][1] == "off"
assert protocol.send_command_ack.call_args_list[1][0][1] == "on"
assert protocol.send_command_ack.call_args_list[2][0][1] == "on"
assert protocol.send_command_ack.call_args_list[3][0][1] == "on"
async def test_type_toggle(hass, monkeypatch):

View File

@ -231,6 +231,88 @@ def test_async_schedule_update_ha_state(hass):
assert update_call is True
async def test_async_async_request_call_without_lock(hass):
"""Test for async_requests_call works without a lock."""
updates = []
class AsyncEntity(entity.Entity):
def __init__(self, entity_id):
"""Initialize Async test entity."""
self.entity_id = entity_id
self.hass = hass
async def testhelper(self, count):
"""Helper function."""
updates.append(count)
ent_1 = AsyncEntity("light.test_1")
ent_2 = AsyncEntity("light.test_2")
try:
job1 = ent_1.async_request_call(ent_1.testhelper(1))
job2 = ent_2.async_request_call(ent_2.testhelper(2))
await asyncio.wait([job1, job2])
while True:
if len(updates) >= 2:
break
await asyncio.sleep(0)
finally:
pass
assert len(updates) == 2
updates.sort()
assert updates == [1, 2]
async def test_async_async_request_call_with_lock(hass):
"""Test for async_requests_call works with a semaphore."""
updates = []
test_semaphore = asyncio.Semaphore(1)
class AsyncEntity(entity.Entity):
def __init__(self, entity_id, lock):
"""Initialize Async test entity."""
self.entity_id = entity_id
self.hass = hass
self.parallel_updates = lock
async def testhelper(self, count):
"""Helper function."""
updates.append(count)
ent_1 = AsyncEntity("light.test_1", test_semaphore)
ent_2 = AsyncEntity("light.test_2", test_semaphore)
try:
assert test_semaphore.locked() is False
await test_semaphore.acquire()
assert test_semaphore.locked()
job1 = ent_1.async_request_call(ent_1.testhelper(1))
job2 = ent_2.async_request_call(ent_2.testhelper(2))
hass.async_create_task(job1)
hass.async_create_task(job2)
assert len(updates) == 0
assert updates == []
assert test_semaphore._value == 0
test_semaphore.release()
while True:
if len(updates) >= 2:
break
await asyncio.sleep(0)
finally:
test_semaphore.release()
assert len(updates) == 2
updates.sort()
assert updates == [1, 2]
async def test_async_parallel_updates_with_zero(hass):
"""Test parallel updates with 0 (disabled)."""
updates = []