Refactor statistics to avoid creating tasks (#116743)

This commit is contained in:
J. Nick Koston 2024-05-05 15:39:45 -05:00 committed by GitHub
parent d970c19342
commit c8e6292cb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -285,6 +285,9 @@ async def async_setup_platform(
class StatisticsSensor(SensorEntity):
"""Representation of a Statistics sensor."""
_attr_should_poll = False
_attr_icon = ICON
def __init__(
self,
source_entity_id: str,
@ -298,9 +301,7 @@ class StatisticsSensor(SensorEntity):
percentile: int,
) -> None:
"""Initialize the Statistics sensor."""
self._attr_icon: str = ICON
self._attr_name: str = name
self._attr_should_poll: bool = False
self._attr_unique_id: str | None = unique_id
self._source_entity_id: str = source_entity_id
self.is_binary: bool = (
@ -326,35 +327,37 @@ class StatisticsSensor(SensorEntity):
self._update_listener: CALLBACK_TYPE | None = None
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
@callback
def async_stats_sensor_state_listener(
def _async_stats_sensor_state_listener(
self,
event: Event[EventStateChangedData],
) -> None:
"""Handle the sensor state changes."""
if (new_state := event.data["new_state"]) is None:
return
self._add_state_to_queue(new_state)
self.async_schedule_update_ha_state(True)
self._async_purge_update_and_schedule()
self.async_write_ha_state()
async def async_stats_sensor_startup(_: HomeAssistant) -> None:
@callback
def _async_stats_sensor_startup(self, _: HomeAssistant) -> None:
"""Add listener and get recorded state."""
_LOGGER.debug("Startup for %s", self.entity_id)
self.async_on_remove(
async_track_state_change_event(
self.hass,
[self._source_entity_id],
async_stats_sensor_state_listener,
self._async_stats_sensor_state_listener,
)
)
if "recorder" in self.hass.config.components:
self.hass.async_create_task(self._initialize_from_database())
self.async_on_remove(async_at_start(self.hass, async_stats_sensor_startup))
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
self.async_on_remove(
async_at_start(self.hass, self._async_stats_sensor_startup)
)
def _add_state_to_queue(self, new_state: State) -> None:
"""Add the state to the queue."""
@ -499,7 +502,8 @@ class StatisticsSensor(SensorEntity):
self.ages.popleft()
self.states.popleft()
def _next_to_purge_timestamp(self) -> datetime | None:
@callback
def _async_next_to_purge_timestamp(self) -> datetime | None:
"""Find the timestamp when the next purge would occur."""
if self.ages and self._samples_max_age:
if self.samples_keep_last and len(self.ages) == 1:
@ -521,6 +525,10 @@ class StatisticsSensor(SensorEntity):
async def async_update(self) -> None:
"""Get the latest data and updates the states."""
self._async_purge_update_and_schedule()
def _async_purge_update_and_schedule(self) -> None:
"""Purge old states, update the sensor and schedule the next update."""
_LOGGER.debug("%s: updating statistics", self.entity_id)
if self._samples_max_age is not None:
self._purge_old_states(self._samples_max_age)
@ -531,22 +539,27 @@ class StatisticsSensor(SensorEntity):
# If max_age is set, ensure to update again after the defined interval.
# By basing updates off the timestamps of sampled data we avoid updating
# when none of the observed entities change.
if timestamp := self._next_to_purge_timestamp():
if timestamp := self._async_next_to_purge_timestamp():
_LOGGER.debug("%s: scheduling update at %s", self.entity_id, timestamp)
self._async_cancel_update_listener()
self._update_listener = async_track_point_in_utc_time(
self.hass, self._async_scheduled_update, timestamp
)
@callback
def _async_cancel_update_listener(self) -> None:
"""Cancel the scheduled update listener."""
if self._update_listener:
self._update_listener()
self._update_listener = None
@callback
def _scheduled_update(now: datetime) -> None:
def _async_scheduled_update(self, now: datetime) -> None:
"""Timer callback for sensor update."""
_LOGGER.debug("%s: executing scheduled update", self.entity_id)
self.async_schedule_update_ha_state(True)
self._update_listener = None
self._update_listener = async_track_point_in_utc_time(
self.hass, _scheduled_update, timestamp
)
self._async_cancel_update_listener()
self._async_purge_update_and_schedule()
self.async_write_ha_state()
def _fetch_states_from_database(self) -> list[State]:
"""Fetch the states from the database."""
@ -589,8 +602,8 @@ class StatisticsSensor(SensorEntity):
for state in reversed(states):
self._add_state_to_queue(state)
self.async_schedule_update_ha_state(True)
self._async_purge_update_and_schedule()
self.async_write_ha_state()
_LOGGER.debug("%s: initializing from database completed", self.entity_id)
def _update_attributes(self) -> None: