Add support for run_immediately to async_listen_once (#113020)

This commit is contained in:
J. Nick Koston 2024-03-11 13:51:03 -10:00 committed by GitHub
parent 0c877339ca
commit 53c3e27ed9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 1 deletions

View File

@ -1465,6 +1465,7 @@ class EventBus:
self,
event_type: str,
listener: Callable[[Event[Any]], Coroutine[Any, Any, None] | None],
run_immediately: bool = False,
) -> CALLBACK_TYPE:
"""Listen once for event of a specific type.
@ -1485,7 +1486,7 @@ class EventBus:
job_type=HassJobType.Callback,
),
None,
False,
run_immediately,
),
)
one_time_listener.remove = remove

View File

@ -1169,6 +1169,21 @@ async def test_eventbus_run_immediately_coro(hass: HomeAssistant) -> None:
unsub()
async def test_eventbus_listen_once_run_immediately_coro(hass: HomeAssistant) -> None:
"""Test we can call events immediately with a coro."""
calls = []
async def listener(event):
"""Mock listener."""
calls.append(event)
hass.bus.async_listen_once("test", listener, run_immediately=True)
hass.bus.async_fire("test", {"event": True})
# No async_block_till_done here
assert len(calls) == 1
async def test_eventbus_unsubscribe_listener(hass: HomeAssistant) -> None:
"""Test unsubscribe listener from returned function."""
calls = []