Make SSDP tasks background HassJob to avoid delaying startup (#112668)

This commit is contained in:
J. Nick Koston 2024-03-08 23:11:00 -10:00 committed by GitHub
parent b7d9f26cee
commit 2b0b3c238a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 60 additions and 40 deletions

View File

@ -3,10 +3,11 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Awaitable, Callable, Mapping from collections.abc import Callable, Coroutine, Mapping
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import timedelta from datetime import timedelta
from enum import Enum from enum import Enum
from functools import partial
from ipaddress import IPv4Address, IPv6Address from ipaddress import IPv4Address, IPv6Address
import logging import logging
import socket import socket
@ -42,7 +43,7 @@ from homeassistant.const import (
MATCH_ALL, MATCH_ALL,
__version__ as current_version, __version__ as current_version,
) )
from homeassistant.core import Event, HomeAssistant, callback as core_callback from homeassistant.core import Event, HassJob, HomeAssistant, callback as core_callback
from homeassistant.data_entry_flow import BaseServiceInfo from homeassistant.data_entry_flow import BaseServiceInfo
from homeassistant.helpers import config_validation as cv, discovery_flow from homeassistant.helpers import config_validation as cv, discovery_flow
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
@ -53,6 +54,7 @@ from homeassistant.helpers.system_info import async_get_system_info
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass from homeassistant.loader import async_get_ssdp, bind_hass
from homeassistant.util.async_ import create_eager_task from homeassistant.util.async_ import create_eager_task
from homeassistant.util.logging import catch_log_exception
DOMAIN = "ssdp" DOMAIN = "ssdp"
SSDP_SCANNER = "scanner" SSDP_SCANNER = "scanner"
@ -124,7 +126,9 @@ class SsdpServiceInfo(BaseServiceInfo):
SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE") SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
SsdpCallback = Callable[[SsdpServiceInfo, SsdpChange], Awaitable] SsdpHassJobCallback = HassJob[
[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None
]
SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = { SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE, SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE,
@ -135,10 +139,15 @@ SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
} }
def _format_err(name: str, *args: Any) -> str:
"""Format error message."""
return f"Exception in SSDP callback {name}: {args}"
@bind_hass @bind_hass
async def async_register_callback( async def async_register_callback(
hass: HomeAssistant, hass: HomeAssistant,
callback: SsdpCallback, callback: Callable[[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None],
match_dict: None | dict[str, str] = None, match_dict: None | dict[str, str] = None,
) -> Callable[[], None]: ) -> Callable[[], None]:
"""Register to receive a callback on ssdp broadcast. """Register to receive a callback on ssdp broadcast.
@ -146,7 +155,14 @@ async def async_register_callback(
Returns a callback that can be used to cancel the registration. Returns a callback that can be used to cancel the registration.
""" """
scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER] scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
return await scanner.async_register_callback(callback, match_dict) job = HassJob(
catch_log_exception(
callback,
partial(_format_err, str(callback)),
),
f"ssdp callback {match_dict}",
)
return await scanner.async_register_callback(job, match_dict)
@bind_hass @bind_hass
@ -206,14 +222,18 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True return True
async def _async_process_callbacks( @core_callback
callbacks: list[SsdpCallback], def _async_process_callbacks(
hass: HomeAssistant,
callbacks: list[SsdpHassJobCallback],
discovery_info: SsdpServiceInfo, discovery_info: SsdpServiceInfo,
ssdp_change: SsdpChange, ssdp_change: SsdpChange,
) -> None: ) -> None:
for callback in callbacks: for callback in callbacks:
try: try:
await callback(discovery_info, ssdp_change) hass.async_run_hass_job(
callback, discovery_info, ssdp_change, eager_start=True, background=True
)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info) _LOGGER.exception("Failed to callback info: %s", discovery_info)
@ -287,7 +307,7 @@ class Scanner:
self._cancel_scan: Callable[[], None] | None = None self._cancel_scan: Callable[[], None] | None = None
self._ssdp_listeners: list[SsdpListener] = [] self._ssdp_listeners: list[SsdpListener] = []
self._device_tracker = SsdpDeviceTracker() self._device_tracker = SsdpDeviceTracker()
self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = [] self._callbacks: list[tuple[SsdpHassJobCallback, dict[str, str]]] = []
self._description_cache: DescriptionCache | None = None self._description_cache: DescriptionCache | None = None
self.integration_matchers = integration_matchers self.integration_matchers = integration_matchers
@ -297,7 +317,7 @@ class Scanner:
return list(self._device_tracker.devices.values()) return list(self._device_tracker.devices.values())
async def async_register_callback( async def async_register_callback(
self, callback: SsdpCallback, match_dict: None | dict[str, str] = None self, callback: SsdpHassJobCallback, match_dict: None | dict[str, str] = None
) -> Callable[[], None]: ) -> Callable[[], None]:
"""Register a callback.""" """Register a callback."""
if match_dict is None: if match_dict is None:
@ -310,7 +330,8 @@ class Scanner:
for ssdp_device in self._ssdp_devices: for ssdp_device in self._ssdp_devices:
for headers in ssdp_device.all_combined_headers.values(): for headers in ssdp_device.all_combined_headers.values():
if _async_headers_match(headers, lower_match_dict): if _async_headers_match(headers, lower_match_dict):
await _async_process_callbacks( _async_process_callbacks(
self.hass,
[callback], [callback],
await self._async_headers_to_discovery_info( await self._async_headers_to_discovery_info(
ssdp_device, headers ssdp_device, headers
@ -426,7 +447,7 @@ class Scanner:
def _async_get_matching_callbacks( def _async_get_matching_callbacks(
self, self,
combined_headers: CaseInsensitiveDict, combined_headers: CaseInsensitiveDict,
) -> list[SsdpCallback]: ) -> list[SsdpHassJobCallback]:
"""Return a list of callbacks that match.""" """Return a list of callbacks that match."""
return [ return [
callback callback
@ -451,10 +472,11 @@ class Scanner:
_, info_desc = self._description_cache.peek_description_dict(location) _, info_desc = self._description_cache.peek_description_dict(location)
if info_desc is None: if info_desc is None:
# Fetch info desc in separate task and process from there. # Fetch info desc in separate task and process from there.
self.hass.async_create_task( self.hass.async_create_background_task(
self._ssdp_listener_process_callback_with_lookup( self._ssdp_listener_process_callback_with_lookup(
ssdp_device, dst, source ssdp_device, dst, source
), ),
name=f"ssdp_info_desc_lookup_{location}",
eager_start=True, eager_start=True,
) )
return return
@ -509,10 +531,7 @@ class Scanner:
if callbacks: if callbacks:
ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source] ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
self.hass.async_create_task( _async_process_callbacks(self.hass, callbacks, discovery_info, ssdp_change)
_async_process_callbacks(callbacks, discovery_info, ssdp_change),
eager_start=True,
)
# Config flows should only be created for alive/update messages from alive devices # Config flows should only be created for alive/update messages from alive devices
if source == SsdpSource.ADVERTISEMENT_BYEBYE: if source == SsdpSource.ADVERTISEMENT_BYEBYE:

View File

@ -1425,7 +1425,7 @@ async def test_become_available(
domain_data_mock.upnp_factory.async_create_device.reset_mock() domain_data_mock.upnp_factory.async_create_device.reset_mock()
# Send an SSDP notification from the now alive device # Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1498,7 +1498,7 @@ async def test_alive_but_gone(
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
# Send an SSDP notification from the still missing device # Send an SSDP notification from the still missing device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1611,7 +1611,7 @@ async def test_multiple_ssdp_alive(
) )
# Send two SSDP notifications with the new device URL # Send two SSDP notifications with the new device URL
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1651,7 +1651,7 @@ async def test_ssdp_byebye(
) -> None: ) -> None:
"""Test device is disconnected when byebye is received.""" """Test device is disconnected when byebye is received."""
# First byebye will cause a disconnect # First byebye will cause a disconnect
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1703,7 +1703,7 @@ async def test_ssdp_update_seen_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None domain_data_mock.upnp_factory.async_create_device.side_effect = None
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1830,7 +1830,7 @@ async def test_ssdp_update_missed_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None domain_data_mock.upnp_factory.async_create_device.side_effect = None
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -1907,7 +1907,7 @@ async def test_ssdp_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None domain_data_mock.upnp_factory.async_create_device.side_effect = None
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -2367,7 +2367,7 @@ async def test_connections_restored(
domain_data_mock.upnp_factory.async_create_device.reset_mock() domain_data_mock.upnp_factory.async_create_device.reset_mock()
# Send an SSDP notification from the now alive device # Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,

View File

@ -177,7 +177,7 @@ async def test_become_available(
upnp_factory_mock.async_create_device.reset_mock() upnp_factory_mock.async_create_device.reset_mock()
# Send an SSDP notification from the now alive device # Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -205,7 +205,7 @@ async def test_alive_but_gone(
upnp_factory_mock.async_create_device.side_effect = UpnpError upnp_factory_mock.async_create_device.side_effect = UpnpError
# Send an SSDP notification from the still missing device # Send an SSDP notification from the still missing device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -308,7 +308,7 @@ async def test_multiple_ssdp_alive(
upnp_factory_mock.async_create_device.side_effect = create_device_delayed upnp_factory_mock.async_create_device.side_effect = create_device_delayed
# Send two SSDP notifications with the new device URL # Send two SSDP notifications with the new device URL
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -343,7 +343,7 @@ async def test_ssdp_byebye(
) -> None: ) -> None:
"""Test device is disconnected when byebye is received.""" """Test device is disconnected when byebye is received."""
# First byebye will cause a disconnect # First byebye will cause a disconnect
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -386,7 +386,7 @@ async def test_ssdp_update_seen_bootid(
upnp_factory_mock.async_create_device.side_effect = None upnp_factory_mock.async_create_device.side_effect = None
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -498,7 +498,7 @@ async def test_ssdp_update_missed_bootid(
upnp_factory_mock.async_create_device.side_effect = None upnp_factory_mock.async_create_device.side_effect = None
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,
@ -568,7 +568,7 @@ async def test_ssdp_bootid(
upnp_factory_mock.async_create_device.reset_mock() upnp_factory_mock.async_create_device.reset_mock()
# Send SSDP alive with boot ID # Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,

View File

@ -67,7 +67,7 @@ async def test_catch_request_error_unavailable(
) -> None: ) -> None:
"""Test the device is checked for availability before trying requests.""" """Test the device is checked for availability before trying requests."""
# DmsDevice notifies of disconnect via SSDP # DmsDevice notifies of disconnect via SSDP
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0] ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback( await ssdp_callback(
ssdp.SsdpServiceInfo( ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN, ssdp_usn=MOCK_DEVICE_USN,

View File

@ -343,7 +343,7 @@ async def test_flow_start_only_alive(
} }
) )
ssdp_listener._on_search(mock_ssdp_search_response) ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
mock_flow_init.assert_awaited_once_with( mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
@ -464,6 +464,7 @@ async def test_start_stop_scanner(mock_source_set, hass: HomeAssistant) -> None:
@pytest.mark.usefixtures("mock_get_source_ip") @pytest.mark.usefixtures("mock_get_source_ip")
@pytest.mark.no_fail_on_log_exception
@patch("homeassistant.components.ssdp.async_get_ssdp", return_value={}) @patch("homeassistant.components.ssdp.async_get_ssdp", return_value={})
async def test_scan_with_registered_callback( async def test_scan_with_registered_callback(
mock_get_ssdp, mock_get_ssdp,
@ -523,9 +524,9 @@ async def test_scan_with_registered_callback(
async_match_any_callback = AsyncMock() async_match_any_callback = AsyncMock()
await ssdp.async_register_callback(hass, async_match_any_callback) await ssdp.async_register_callback(hass, async_match_any_callback)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
ssdp_listener._on_search(mock_ssdp_search_response) ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
assert async_integration_callback.call_count == 1 assert async_integration_callback.call_count == 1
assert async_integration_match_all_callback.call_count == 1 assert async_integration_match_all_callback.call_count == 1
@ -549,7 +550,7 @@ async def test_scan_with_registered_callback(
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus", ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL", ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
} }
assert "Failed to callback info" in caplog.text assert "Exception in SSDP callback" in caplog.text
async_integration_callback_from_cache = AsyncMock() async_integration_callback_from_cache = AsyncMock()
await ssdp.async_register_callback( await ssdp.async_register_callback(
@ -835,7 +836,7 @@ async def test_flow_dismiss_on_byebye(
} }
) )
ssdp_listener._on_search(mock_ssdp_search_response) ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
mock_flow_init.assert_awaited_once_with( mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
@ -853,7 +854,7 @@ async def test_flow_dismiss_on_byebye(
} }
) )
ssdp_listener._on_alive(mock_ssdp_advertisement) ssdp_listener._on_alive(mock_ssdp_advertisement)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
mock_flow_init.assert_awaited_once_with( mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY "mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
) )
@ -868,7 +869,7 @@ async def test_flow_dismiss_on_byebye(
hass.config_entries.flow, "async_abort" hass.config_entries.flow, "async_abort"
) as mock_async_abort: ) as mock_async_abort:
ssdp_listener._on_byebye(mock_ssdp_advertisement) ssdp_listener._on_byebye(mock_ssdp_advertisement)
await hass.async_block_till_done() await hass.async_block_till_done(wait_background_tasks=True)
assert len(mock_async_progress_by_init_data_type.mock_calls) == 1 assert len(mock_async_progress_by_init_data_type.mock_calls) == 1
assert mock_async_abort.mock_calls[0][1][0] == "mock_flow_id" assert mock_async_abort.mock_calls[0][1][0] == "mock_flow_id"