Pass a helpful name when creating common asyncio tasks in core (#89171)

This commit is contained in:
J. Nick Koston 2023-03-05 01:46:02 -10:00 committed by GitHub
parent 927b43626c
commit 11681f3f31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 169 additions and 61 deletions

View File

@ -508,7 +508,9 @@ async def async_setup_multi_components(
) -> None:
"""Set up multiple domains. Log on failure."""
futures = {
domain: hass.async_create_task(async_setup_component(hass, domain, config))
domain: hass.async_create_task(
async_setup_component(hass, domain, config), f"setup component {domain}"
)
for domain in domains
}
await asyncio.wait(futures.values())

View File

@ -27,7 +27,7 @@ async def async_attach_trigger(
"""Listen for events based on configuration."""
trigger_data = trigger_info["trigger_data"]
event = config.get(CONF_EVENT)
job = HassJob(action)
job = HassJob(action, f"homeassistant trigger {trigger_info}")
if event == EVENT_SHUTDOWN:

View File

@ -100,7 +100,7 @@ async def async_attach_trigger(
armed_entities = set()
period: dict = {}
attribute = config.get(CONF_ATTRIBUTE)
job = HassJob(action)
job = HassJob(action, f"numeric state trigger {trigger_info}")
trigger_data = trigger_info["trigger_data"]
_variables = trigger_info["variables"] or {}

View File

@ -123,7 +123,7 @@ async def async_attach_trigger(
unsub_track_same = {}
period: dict[str, timedelta] = {}
attribute = config.get(CONF_ATTRIBUTE)
job = HassJob(action)
job = HassJob(action, f"state trigger {trigger_info}")
trigger_data = trigger_info["trigger_data"]
_variables = trigger_info["variables"] or {}

View File

@ -49,7 +49,7 @@ async def async_attach_trigger(
trigger_data = trigger_info["trigger_data"]
entities: dict[str, CALLBACK_TYPE] = {}
removes = []
job = HassJob(action)
job = HassJob(action, f"time trigger {trigger_info}")
@callback
def time_automation_listener(description, now, *, entity_id=None):

View File

@ -66,7 +66,7 @@ async def async_attach_trigger(
hours = config.get(CONF_HOURS)
minutes = config.get(CONF_MINUTES)
seconds = config.get(CONF_SECONDS)
job = HassJob(action)
job = HassJob(action, f"time pattern trigger {trigger_info}")
# If larger units are specified, default the smaller units to zero
if minutes is None and hours is not None:

View File

@ -729,7 +729,8 @@ class ConfigEntry:
}
| (context or {}),
data=self.data | (data or {}),
)
),
f"config entry reauth {self.title} {self.domain} {self.entry_id}",
)
@callback
@ -746,7 +747,10 @@ class ConfigEntry:
@callback
def async_create_task(
self, hass: HomeAssistant, target: Coroutine[Any, Any, _R]
self,
hass: HomeAssistant,
target: Coroutine[Any, Any, _R],
name: str | None = None,
) -> asyncio.Task[_R]:
"""Create a task from within the eventloop.
@ -754,7 +758,9 @@ class ConfigEntry:
target: target to call.
"""
task = hass.async_create_task(target)
task = hass.async_create_task(
target, f"{name} {self.title} {self.domain} {self.entry_id}"
)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
@ -824,7 +830,10 @@ class ConfigEntriesFlowManager(data_entry_flow.FlowManager):
init_done: asyncio.Future[None] = asyncio.Future()
self._pending_import_flows.setdefault(handler, {})[flow_id] = init_done
task = asyncio.create_task(self._async_init(flow_id, handler, context, data))
task = asyncio.create_task(
self._async_init(flow_id, handler, context, data),
name=f"config entry flow {handler} {flow_id}",
)
self._initialize_tasks.setdefault(handler, []).append(task)
try:
@ -1112,7 +1121,8 @@ class ConfigEntries:
entry.domain,
context={"source": SOURCE_UNIGNORE},
data={"unique_id": entry.unique_id},
)
),
f"config entry unignore {entry.title} {entry.domain} {entry.unique_id}",
)
self._async_dispatch(ConfigEntryChange.REMOVED, entry)
@ -1337,7 +1347,10 @@ class ConfigEntries:
for listener_ref in entry.update_listeners:
if (listener := listener_ref()) is not None:
self.hass.async_create_task(listener(self.hass, entry))
self.hass.async_create_task(
listener(self.hass, entry),
f"config entry update listener {entry.title} {entry.domain} {entry.domain}",
)
self._async_schedule_save()
self._async_dispatch(ConfigEntryChange.UPDATED, entry)
@ -1367,7 +1380,10 @@ class ConfigEntries:
error_if_core=False,
)
for platform in platforms:
self.hass.async_create_task(self.async_forward_entry_setup(entry, platform))
self.hass.async_create_task(
self.async_forward_entry_setup(entry, platform),
f"config entry forward setup {entry.title} {entry.domain} {entry.entry_id} {platform}",
)
async def async_forward_entry_setups(
self, entry: ConfigEntry, platforms: Iterable[Platform | str]
@ -1549,7 +1565,8 @@ class ConfigFlow(data_entry_flow.FlowHandler):
continue
if should_reload:
self.hass.async_create_task(
self.hass.config_entries.async_reload(entry.entry_id)
self.hass.config_entries.async_reload(entry.entry_id),
f"config entry reload {entry.title} {entry.domain} {entry.entry_id}",
)
raise data_entry_flow.AbortFlow(error)

View File

@ -217,16 +217,17 @@ class HassJob(Generic[_P, _R_co]):
we run the job.
"""
__slots__ = ("job_type", "target")
__slots__ = ("job_type", "target", "name")
def __init__(self, target: Callable[_P, _R_co]) -> None:
def __init__(self, target: Callable[_P, _R_co], name: str | None = None) -> None:
"""Create a job object."""
self.target = target
self.name = name
self.job_type = _get_hassjob_callable_job_type(target)
def __repr__(self) -> str:
"""Return the job."""
return f"<Job {self.job_type} {self.target}>"
return f"<Job {self.name} {self.job_type} {self.target}>"
def _get_hassjob_callable_job_type(target: Callable[..., Any]) -> HassJobType:
@ -488,7 +489,7 @@ class HomeAssistant:
hassjob.target = cast(
Callable[..., Coroutine[Any, Any, _R]], hassjob.target
)
task = self.loop.create_task(hassjob.target(*args))
task = self.loop.create_task(hassjob.target(*args), name=hassjob.name)
elif hassjob.job_type == HassJobType.Callback:
if TYPE_CHECKING:
hassjob.target = cast(Callable[..., _R], hassjob.target)
@ -512,7 +513,9 @@ class HomeAssistant:
self.loop.call_soon_threadsafe(self.async_create_task, target)
@callback
def async_create_task(self, target: Coroutine[Any, Any, _R]) -> asyncio.Task[_R]:
def async_create_task(
self, target: Coroutine[Any, Any, _R], name: str | None = None
) -> asyncio.Task[_R]:
"""Create a task from within the eventloop.
This method must be run in the event loop. If you are using this in your
@ -520,7 +523,7 @@ class HomeAssistant:
target: target to call.
"""
task = self.loop.create_task(target)
task = self.loop.create_task(target, name=name)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
return task
@ -1037,7 +1040,10 @@ class EventBus:
if run_immediately and not is_callback(listener):
raise HomeAssistantError(f"Event listener {listener} is not a callback")
return self._async_listen_filterable_job(
event_type, _FilterableJob(HassJob(listener), event_filter, run_immediately)
event_type,
_FilterableJob(
HassJob(listener, "listen {event_type}"), event_filter, run_immediately
),
)
@callback
@ -1111,7 +1117,11 @@ class EventBus:
_onetime_listener, listener, ("__name__", "__qualname__", "__module__"), []
)
filterable_job = _FilterableJob(HassJob(_onetime_listener), None, False)
filterable_job = _FilterableJob(
HassJob(_onetime_listener, "onetime listen {event_type} {listener}"),
None,
False,
)
return self._async_listen_filterable_job(event_type, filterable_job)
@ -1558,16 +1568,18 @@ class StateMachine:
class Service:
"""Representation of a callable service."""
__slots__ = ["job", "schema"]
__slots__ = ["job", "schema", "domain", "service"]
def __init__(
self,
func: Callable[[ServiceCall], Coroutine[Any, Any, None] | None],
schema: vol.Schema | None,
domain: str,
service: str,
context: Context | None = None,
) -> None:
"""Initialize a service."""
self.job = HassJob(func)
self.job = HassJob(func, f"service {domain}.{service}")
self.schema = schema
@ -1659,7 +1671,7 @@ class ServiceRegistry:
"""
domain = domain.lower()
service = service.lower()
service_obj = Service(service_func, schema)
service_obj = Service(service_func, schema, domain, service)
if domain in self._services:
self._services[domain][service] = service_obj

View File

@ -38,7 +38,11 @@ class Debouncer(Generic[_R_co]):
self._execute_at_end_of_timer: bool = False
self._execute_lock = asyncio.Lock()
self._job: HassJob[[], _R_co] | None = (
None if function is None else HassJob(function)
None
if function is None
else HassJob(
function, f"debouncer cooldown={cooldown}, immediate={immediate}"
)
)
@property
@ -51,7 +55,10 @@ class Debouncer(Generic[_R_co]):
"""Update the function being wrapped by the Debouncer."""
self._function = function
if self._job is None or function != self._job.target:
self._job = HassJob(function)
self._job = HassJob(
function,
f"debouncer cooldown={self.cooldown}, immediate={self.immediate}",
)
async def async_call(self) -> None:
"""Call the function."""
@ -126,5 +133,8 @@ class Debouncer(Generic[_R_co]):
"""Schedule a timer."""
self._timer_task = self.hass.loop.call_later(
self.cooldown,
lambda: self.hass.async_create_task(self._handle_timer_finish()),
lambda: self.hass.async_create_task(
self._handle_timer_finish(),
f"debouncer {self._job} finish cooldown={self.cooldown}, immediate={self.immediate}",
),
)

View File

@ -44,7 +44,7 @@ def async_listen(
Service can be a string or a list/tuple.
"""
job = core.HassJob(callback)
job = core.HassJob(callback, f"discovery listener {service}")
async def discovery_event_listener(discovered: DiscoveryDict) -> None:
"""Listen for discovery events."""
@ -103,7 +103,7 @@ def async_listen_platform(
This method must be run in the event loop.
"""
service = EVENT_LOAD_PLATFORM.format(component)
job = core.HassJob(callback)
job = core.HassJob(callback, f"platform loaded {component}")
async def discovery_platform_listener(discovered: DiscoveryDict) -> None:
"""Listen for platform discovery events."""

View File

@ -29,7 +29,7 @@ def async_create_flow(
if not dispatcher or dispatcher.started:
if init_coro := _async_init_flow(hass, domain, context, data):
hass.async_create_task(init_coro)
hass.async_create_task(init_coro, f"discovery flow {domain} {context}")
return
return dispatcher.async_create(domain, context, data)

View File

@ -75,7 +75,8 @@ def _generate_job(
signal,
args,
),
)
),
f"dispatcher {signal}",
)

View File

@ -702,7 +702,10 @@ class Entity(ABC):
been executed, the intermediate state transitions will be missed.
"""
if force_refresh:
self.hass.async_create_task(self.async_update_ha_state(force_refresh))
self.hass.async_create_task(
self.async_update_ha_state(force_refresh),
f"Entity schedule update ha state {self.entity_id}",
)
else:
self.async_write_ha_state()
@ -722,7 +725,9 @@ class Entity(ABC):
try:
task: asyncio.Future[None]
if hasattr(self, "async_update"):
task = self.hass.async_create_task(self.async_update())
task = self.hass.async_create_task(
self.async_update(), f"Entity async update {self.entity_id}"
)
elif hasattr(self, "update"):
task = self.hass.async_add_executor_job(self.update)
else:

View File

@ -131,7 +131,10 @@ class EntityComponent(Generic[_EntityT]):
# Look in config for Domain, Domain 2, Domain 3 etc and load them
for p_type, p_config in config_per_platform(config, self.domain):
if p_type is not None:
self.hass.async_create_task(self.async_setup_platform(p_type, p_config))
self.hass.async_create_task(
self.async_setup_platform(p_type, p_config),
f"EntityComponent setup platform {p_type} {self.domain}",
)
# Generic discovery listener for loading platform dynamically
# Refer to: homeassistant.helpers.discovery.async_load_platform()

View File

@ -375,6 +375,7 @@ class EntityPlatform:
"""Schedule adding entities for a single platform async."""
task = self.hass.async_create_task(
self.async_add_entities(new_entities, update_before_add=update_before_add),
f"EntityPlatform async_add_entities {self.domain}.{self.platform_name}",
)
if not self._setup_complete:
@ -389,6 +390,7 @@ class EntityPlatform:
task = self.config_entry.async_create_task(
self.hass,
self.async_add_entities(new_entities, update_before_add=update_before_add),
f"EntityPlatform async_add_entities_for_entry {self.domain}.{self.platform_name}",
)
if not self._setup_complete:

View File

@ -176,7 +176,7 @@ def async_track_state_change(
else:
entity_ids = tuple(entity_id.lower() for entity_id in entity_ids)
job = HassJob(action)
job = HassJob(action, f"track state change {entity_ids} {from_state} {to_state}")
@callback
def state_change_filter(event: Event) -> bool:
@ -296,7 +296,7 @@ def _async_track_state_change_event(
event_filter=_async_state_change_filter,
)
job = HassJob(action)
job = HassJob(action, f"track state change event {entity_ids}")
for entity_id in entity_ids:
entity_callbacks.setdefault(entity_id, []).append(job)
@ -393,7 +393,7 @@ def async_track_entity_registry_updated_event(
event_filter=_async_entity_registry_updated_filter,
)
job = HassJob(action)
job = HassJob(action, f"track entity registry updated event {entity_ids}")
for entity_id in entity_ids:
entity_callbacks.setdefault(entity_id, []).append(job)
@ -476,7 +476,7 @@ def _async_track_state_added_domain(
event_filter=_async_state_change_filter,
)
job = HassJob(action)
job = HassJob(action, f"track state added domain event {domains}")
for domain in domains:
domain_callbacks.setdefault(domain, []).append(job)
@ -530,7 +530,7 @@ def async_track_state_removed_domain(
event_filter=_async_state_change_filter,
)
job = HassJob(action)
job = HassJob(action, f"track state removed domain event {domains}")
for domain in domains:
domain_callbacks.setdefault(domain, []).append(job)
@ -569,7 +569,9 @@ class _TrackStateChangeFiltered:
"""Handle removal / refresh of tracker init."""
self.hass = hass
self._action = action
self._action_as_hassjob = HassJob(action)
self._action_as_hassjob = HassJob(
action, f"track state change filtered {track_states}"
)
self._listeners: dict[str, Callable[[], None]] = {}
self._last_track_states: TrackStates = track_states
@ -764,7 +766,7 @@ def async_track_template(
Callable to unregister the listener.
"""
job = HassJob(action)
job = HassJob(action, f"track template {template}")
@callback
def _template_changed_listener(
@ -821,7 +823,7 @@ class TrackTemplateResultInfo:
) -> None:
"""Handle removal / refresh of tracker init."""
self.hass = hass
self._job = HassJob(action)
self._job = HassJob(action, f"track template result {track_templates}")
for track_template_ in track_templates:
track_template_.template.hass = hass
@ -1215,7 +1217,7 @@ def async_track_same_state(
async_remove_state_for_cancel: CALLBACK_TYPE | None = None
async_remove_state_for_listener: CALLBACK_TYPE | None = None
job = HassJob(action)
job = HassJob(action, f"track same state {period} {entity_ids}")
@callback
def clear_listener() -> None:
@ -1277,7 +1279,11 @@ def async_track_point_in_time(
point_in_time: datetime,
) -> CALLBACK_TYPE:
"""Add a listener that fires once after a specific point in time."""
job = action if isinstance(action, HassJob) else HassJob(action)
job = (
action
if isinstance(action, HassJob)
else HassJob(action, f"track point in time {point_in_time}")
)
@callback
def utc_converter(utc_now: datetime) -> None:
@ -1324,7 +1330,11 @@ def async_track_point_in_utc_time(
hass.async_run_hass_job(job, utc_point_in_time)
job = action if isinstance(action, HassJob) else HassJob(action)
job = (
action
if isinstance(action, HassJob)
else HassJob(action, f"track point in utc time {utc_point_in_time}")
)
delta = expected_fire_timestamp - time.time()
cancel_callback = hass.loop.call_later(delta, run_action, job)
@ -1357,7 +1367,11 @@ def async_call_later(
"""Call the action."""
hass.async_run_hass_job(job, time_tracker_utcnow())
job = action if isinstance(action, HassJob) else HassJob(action)
job = (
action
if isinstance(action, HassJob)
else HassJob(action, f"call_later {delay}")
)
cancel_callback = hass.loop.call_later(delay, run_action, job)
@callback
@ -1383,7 +1397,7 @@ def async_track_time_interval(
remove: CALLBACK_TYPE
interval_listener_job: HassJob[[datetime], None]
job = HassJob(action)
job = HassJob(action, f"track time interval {interval}")
def next_interval() -> datetime:
"""Return the next interval."""
@ -1400,7 +1414,9 @@ def async_track_time_interval(
)
hass.async_run_hass_job(job, now)
interval_listener_job = HassJob(interval_listener)
interval_listener_job = HassJob(
interval_listener, f"track time interval listener {interval}"
)
remove = async_track_point_in_utc_time(hass, interval_listener_job, next_interval())
def remove_listener() -> None:
@ -1479,7 +1495,9 @@ def async_track_sunrise(
hass: HomeAssistant, action: Callable[[], None], offset: timedelta | None = None
) -> CALLBACK_TYPE:
"""Add a listener that will fire a specified offset from sunrise daily."""
listener = SunListener(hass, HassJob(action), SUN_EVENT_SUNRISE, offset)
listener = SunListener(
hass, HassJob(action, "track sunrise"), SUN_EVENT_SUNRISE, offset
)
listener.async_attach()
return listener.async_detach
@ -1493,7 +1511,9 @@ def async_track_sunset(
hass: HomeAssistant, action: Callable[[], None], offset: timedelta | None = None
) -> CALLBACK_TYPE:
"""Add a listener that will fire a specified offset from sunset daily."""
listener = SunListener(hass, HassJob(action), SUN_EVENT_SUNSET, offset)
listener = SunListener(
hass, HassJob(action, "track sunset"), SUN_EVENT_SUNSET, offset
)
listener.async_attach()
return listener.async_detach
@ -1526,7 +1546,7 @@ def async_track_utc_time_change(
# misalignment we use async_track_time_interval here
return async_track_time_interval(hass, action, timedelta(seconds=1))
job = HassJob(action)
job = HassJob(action, f"track time change {hour}:{minute}:{second} local={local}")
matching_seconds = dt_util.parse_time_expression(second, 0, 59)
matching_minutes = dt_util.parse_time_expression(minute, 0, 59)
matching_hours = dt_util.parse_time_expression(hour, 0, 23)

View File

@ -212,7 +212,7 @@ class RestoreStateData:
# Dump the initial states now. This helps minimize the risk of having
# old states loaded by overwriting the last states once Home Assistant
# has started and the old states have been read.
self.hass.async_create_task(_async_dump_states())
self.hass.async_create_task(_async_dump_states(), "RestoreStateData dump")
# Dump states periodically
cancel_interval = async_track_time_interval(

View File

@ -115,7 +115,9 @@ class Store(Generic[_T]):
the second call will wait and return the result of the first call.
"""
if self._load_task is None:
self._load_task = self.hass.async_create_task(self._async_load())
self._load_task = self.hass.async_create_task(
self._async_load(), f"Storage load {self.key}"
)
return await self._load_task

View File

@ -169,7 +169,7 @@ class PluggableAction:
if not entry.actions and not entry.plugs:
del reg[key]
job = HassJob(action)
job = HassJob(action, f"trigger {trigger} {variables}")
entry.actions[_remove] = (job, variables)
_update()

View File

@ -87,7 +87,14 @@ class DataUpdateCoordinator(BaseDataUpdateCoordinatorProtocol, Generic[_T]):
)
self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
self._job = HassJob(self._handle_refresh_interval)
job_name = "DataUpdateCoordinator"
type_name = type(self).__name__
if type_name != job_name:
job_name += f" {type_name}"
job_name += f" {name}"
if entry := self.config_entry:
job_name += f" {entry.title} {entry.domain} {entry.entry_id}"
self._job = HassJob(self._handle_refresh_interval, job_name)
self._unsub_refresh: CALLBACK_TYPE | None = None
self._request_refresh_task: asyncio.TimerHandle | None = None
self.last_update_success = True

View File

@ -93,7 +93,7 @@ async def async_setup_component(
return await setup_tasks[domain]
task = setup_tasks[domain] = hass.async_create_task(
_async_setup_component(hass, domain, config)
_async_setup_component(hass, domain, config), f"setup component {domain}"
)
try:
@ -426,7 +426,7 @@ def _async_when_setup(
_LOGGER.exception("Error handling when_setup callback for %s", component)
if component in hass.config.components:
hass.async_create_task(when_setup())
hass.async_create_task(when_setup(), f"when setup {component}")
return
listeners: list[CALLBACK_TYPE] = []

View File

@ -210,14 +210,14 @@ async def async_test_home_assistant(event_loop, load_registries=True):
return orig_async_add_executor_job(target, *args)
def async_create_task(coroutine):
def async_create_task(coroutine, name=None):
"""Create task."""
if isinstance(coroutine, Mock) and not isinstance(coroutine, AsyncMock):
fut = asyncio.Future()
fut.set_result(None)
return fut
return orig_async_create_task(coroutine)
return orig_async_create_task(coroutine, name)
hass.async_add_job = async_add_job
hass.async_add_executor_job = async_add_executor_job

View File

@ -65,7 +65,7 @@ def test_split_entity_id() -> None:
def test_async_add_hass_job_schedule_callback() -> None:
"""Test that we schedule coroutines and add jobs to the job pool."""
"""Test that we schedule callbacks and add jobs to the job pool."""
hass = MagicMock()
job = MagicMock()
@ -75,6 +75,19 @@ def test_async_add_hass_job_schedule_callback() -> None:
assert len(hass.add_job.mock_calls) == 0
def test_async_add_hass_job_coro_named(hass) -> None:
"""Test that we schedule coroutines and add jobs to the job pool with a name."""
async def mycoro():
pass
job = ha.HassJob(mycoro, "named coro")
assert "named coro" in str(job)
assert job.name == "named coro"
task = ha.HomeAssistant.async_add_hass_job(hass, job)
assert "named coro" in str(task)
def test_async_add_hass_job_schedule_partial_callback() -> None:
"""Test that we schedule partial coros and add jobs to the job pool."""
hass = MagicMock()
@ -141,6 +154,20 @@ def test_async_create_task_schedule_coroutine(event_loop) -> None:
assert len(hass.add_job.mock_calls) == 0
def test_async_create_task_schedule_coroutine_with_name(event_loop) -> None:
"""Test that we schedule coroutines and add jobs to the job pool with a name."""
hass = MagicMock(loop=MagicMock(wraps=event_loop))
async def job():
pass
task = ha.HomeAssistant.async_create_task(hass, job(), "named task")
assert len(hass.loop.call_soon.mock_calls) == 0
assert len(hass.loop.create_task.mock_calls) == 1
assert len(hass.add_job.mock_calls) == 0
assert "named task" in str(task)
def test_async_run_hass_job_calls_callback() -> None:
"""Test that the callback annotation is respected."""
hass = MagicMock()