Move and rename alert base entity to separate module (#126030)

Move alert base entity to separate module
This commit is contained in:
epenet 2024-09-16 20:21:04 +02:00 committed by GitHub
parent b73be2df6e
commit 351de1ca72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 212 additions and 205 deletions

View File

@ -2,18 +2,8 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
from typing import Any
import voluptuous as vol
from homeassistant.components.notify import (
ATTR_DATA,
ATTR_MESSAGE,
ATTR_TITLE,
DOMAIN as DOMAIN_NOTIFY,
)
from homeassistant.const import (
CONF_ENTITY_ID,
CONF_NAME,
@ -22,22 +12,12 @@ from homeassistant.const import (
SERVICE_TOGGLE,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
STATE_IDLE,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import Event, EventStateChangedData, HassJob, HomeAssistant
from homeassistant.exceptions import ServiceNotFound
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.event import (
async_track_point_in_time,
async_track_state_change_event,
)
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.dt import now
from .const import (
CONF_ALERT_MESSAGE,
@ -52,6 +32,7 @@ from .const import (
DOMAIN,
LOGGER,
)
from .entity import AlertEntity
ALERT_SCHEMA = vol.Schema(
{
@ -83,9 +64,9 @@ CONFIG_SCHEMA = vol.Schema(
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Alert component."""
component = EntityComponent[Alert](LOGGER, DOMAIN, hass)
component = EntityComponent[AlertEntity](LOGGER, DOMAIN, hass)
entities: list[Alert] = []
entities: list[AlertEntity] = []
for object_id, cfg in config[DOMAIN].items():
if not cfg:
@ -104,7 +85,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
data = cfg.get(CONF_DATA)
entities.append(
Alert(
AlertEntity(
hass,
object_id,
name,
@ -131,183 +112,3 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await component.async_add_entities(entities)
return True
class Alert(Entity):
"""Representation of an alert."""
_attr_should_poll = False
def __init__(
self,
hass: HomeAssistant,
entity_id: str,
name: str,
watched_entity_id: str,
state: str,
repeat: list[float],
skip_first: bool,
message_template: Template | None,
done_message_template: Template | None,
notifiers: list[str],
can_ack: bool,
title_template: Template | None,
data: dict[Any, Any],
) -> None:
"""Initialize the alert."""
self.hass = hass
self._attr_name = name
self._alert_state = state
self._skip_first = skip_first
self._data = data
self._message_template = message_template
self._done_message_template = done_message_template
self._title_template = title_template
self._notifiers = notifiers
self._can_ack = can_ack
self._delay = [timedelta(minutes=val) for val in repeat]
self._next_delay = 0
self._firing = False
self._ack = False
self._cancel: Callable[[], None] | None = None
self._send_done_message = False
self.entity_id = f"{DOMAIN}.{entity_id}"
async_track_state_change_event(
hass, [watched_entity_id], self.watched_entity_change
)
@property
def state(self) -> str:
"""Return the alert status."""
if self._firing:
if self._ack:
return STATE_OFF
return STATE_ON
return STATE_IDLE
async def watched_entity_change(self, event: Event[EventStateChangedData]) -> None:
"""Determine if the alert should start or stop."""
if (to_state := event.data["new_state"]) is None:
return
LOGGER.debug("Watched entity (%s) has changed", event.data["entity_id"])
if to_state.state == self._alert_state and not self._firing:
await self.begin_alerting()
if to_state.state != self._alert_state and self._firing:
await self.end_alerting()
async def begin_alerting(self) -> None:
"""Begin the alert procedures."""
LOGGER.debug("Beginning Alert: %s", self._attr_name)
self._ack = False
self._firing = True
self._next_delay = 0
if not self._skip_first:
await self._notify()
else:
await self._schedule_notify()
self.async_write_ha_state()
async def end_alerting(self) -> None:
"""End the alert procedures."""
LOGGER.debug("Ending Alert: %s", self._attr_name)
if self._cancel is not None:
self._cancel()
self._cancel = None
self._ack = False
self._firing = False
if self._send_done_message:
await self._notify_done_message()
self.async_write_ha_state()
async def _schedule_notify(self) -> None:
"""Schedule a notification."""
delay = self._delay[self._next_delay]
next_msg = now() + delay
self._cancel = async_track_point_in_time(
self.hass,
HassJob(
self._notify, name="Schedule notify alert", cancel_on_shutdown=True
),
next_msg,
)
self._next_delay = min(self._next_delay + 1, len(self._delay) - 1)
async def _notify(self, *args: Any) -> None:
"""Send the alert notification."""
if not self._firing:
return
if not self._ack:
LOGGER.info("Alerting: %s", self._attr_name)
self._send_done_message = True
if self._message_template is not None:
message = self._message_template.async_render(parse_result=False)
else:
message = self._attr_name
await self._send_notification_message(message)
await self._schedule_notify()
async def _notify_done_message(self) -> None:
"""Send notification of complete alert."""
LOGGER.info("Alerting: %s", self._done_message_template)
self._send_done_message = False
if self._done_message_template is None:
return
message = self._done_message_template.async_render(parse_result=False)
await self._send_notification_message(message)
async def _send_notification_message(self, message: Any) -> None:
if not self._notifiers:
return
msg_payload = {ATTR_MESSAGE: message}
if self._title_template is not None:
title = self._title_template.async_render(parse_result=False)
msg_payload[ATTR_TITLE] = title
if self._data:
msg_payload[ATTR_DATA] = self._data
LOGGER.debug(msg_payload)
for target in self._notifiers:
try:
await self.hass.services.async_call(
DOMAIN_NOTIFY, target, msg_payload, context=self._context
)
except ServiceNotFound:
LOGGER.error(
"Failed to call notify.%s, retrying at next notification interval",
target,
)
async def async_turn_on(self, **kwargs: Any) -> None:
"""Async Unacknowledge alert."""
LOGGER.debug("Reset Alert: %s", self._attr_name)
self._ack = False
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Async Acknowledge alert."""
LOGGER.debug("Acknowledged Alert: %s", self._attr_name)
self._ack = True
self.async_write_ha_state()
async def async_toggle(self, **kwargs: Any) -> None:
"""Async toggle alert."""
if self._ack:
return await self.async_turn_on()
return await self.async_turn_off()

View File

@ -0,0 +1,206 @@
"""Support for repeating alerts when conditions are met."""
from __future__ import annotations
from collections.abc import Callable
from datetime import timedelta
from typing import Any
from homeassistant.components.notify import (
ATTR_DATA,
ATTR_MESSAGE,
ATTR_TITLE,
DOMAIN as DOMAIN_NOTIFY,
)
from homeassistant.const import STATE_IDLE, STATE_OFF, STATE_ON
from homeassistant.core import Event, EventStateChangedData, HassJob, HomeAssistant
from homeassistant.exceptions import ServiceNotFound
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import (
async_track_point_in_time,
async_track_state_change_event,
)
from homeassistant.helpers.template import Template
from homeassistant.util.dt import now
from .const import DOMAIN, LOGGER
class AlertEntity(Entity):
"""Representation of an alert."""
_attr_should_poll = False
def __init__(
self,
hass: HomeAssistant,
entity_id: str,
name: str,
watched_entity_id: str,
state: str,
repeat: list[float],
skip_first: bool,
message_template: Template | None,
done_message_template: Template | None,
notifiers: list[str],
can_ack: bool,
title_template: Template | None,
data: dict[Any, Any],
) -> None:
"""Initialize the alert."""
self.hass = hass
self._attr_name = name
self._alert_state = state
self._skip_first = skip_first
self._data = data
self._message_template = message_template
self._done_message_template = done_message_template
self._title_template = title_template
self._notifiers = notifiers
self._can_ack = can_ack
self._delay = [timedelta(minutes=val) for val in repeat]
self._next_delay = 0
self._firing = False
self._ack = False
self._cancel: Callable[[], None] | None = None
self._send_done_message = False
self.entity_id = f"{DOMAIN}.{entity_id}"
async_track_state_change_event(
hass, [watched_entity_id], self.watched_entity_change
)
@property
def state(self) -> str:
"""Return the alert status."""
if self._firing:
if self._ack:
return STATE_OFF
return STATE_ON
return STATE_IDLE
async def watched_entity_change(self, event: Event[EventStateChangedData]) -> None:
"""Determine if the alert should start or stop."""
if (to_state := event.data["new_state"]) is None:
return
LOGGER.debug("Watched entity (%s) has changed", event.data["entity_id"])
if to_state.state == self._alert_state and not self._firing:
await self.begin_alerting()
if to_state.state != self._alert_state and self._firing:
await self.end_alerting()
async def begin_alerting(self) -> None:
"""Begin the alert procedures."""
LOGGER.debug("Beginning Alert: %s", self._attr_name)
self._ack = False
self._firing = True
self._next_delay = 0
if not self._skip_first:
await self._notify()
else:
await self._schedule_notify()
self.async_write_ha_state()
async def end_alerting(self) -> None:
"""End the alert procedures."""
LOGGER.debug("Ending Alert: %s", self._attr_name)
if self._cancel is not None:
self._cancel()
self._cancel = None
self._ack = False
self._firing = False
if self._send_done_message:
await self._notify_done_message()
self.async_write_ha_state()
async def _schedule_notify(self) -> None:
"""Schedule a notification."""
delay = self._delay[self._next_delay]
next_msg = now() + delay
self._cancel = async_track_point_in_time(
self.hass,
HassJob(
self._notify, name="Schedule notify alert", cancel_on_shutdown=True
),
next_msg,
)
self._next_delay = min(self._next_delay + 1, len(self._delay) - 1)
async def _notify(self, *args: Any) -> None:
"""Send the alert notification."""
if not self._firing:
return
if not self._ack:
LOGGER.info("Alerting: %s", self._attr_name)
self._send_done_message = True
if self._message_template is not None:
message = self._message_template.async_render(parse_result=False)
else:
message = self._attr_name
await self._send_notification_message(message)
await self._schedule_notify()
async def _notify_done_message(self) -> None:
"""Send notification of complete alert."""
LOGGER.info("Alerting: %s", self._done_message_template)
self._send_done_message = False
if self._done_message_template is None:
return
message = self._done_message_template.async_render(parse_result=False)
await self._send_notification_message(message)
async def _send_notification_message(self, message: Any) -> None:
if not self._notifiers:
return
msg_payload = {ATTR_MESSAGE: message}
if self._title_template is not None:
title = self._title_template.async_render(parse_result=False)
msg_payload[ATTR_TITLE] = title
if self._data:
msg_payload[ATTR_DATA] = self._data
LOGGER.debug(msg_payload)
for target in self._notifiers:
try:
await self.hass.services.async_call(
DOMAIN_NOTIFY, target, msg_payload, context=self._context
)
except ServiceNotFound:
LOGGER.error(
"Failed to call notify.%s, retrying at next notification interval",
target,
)
async def async_turn_on(self, **kwargs: Any) -> None:
"""Async Unacknowledge alert."""
LOGGER.debug("Reset Alert: %s", self._attr_name)
self._ack = False
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Async Acknowledge alert."""
LOGGER.debug("Acknowledged Alert: %s", self._attr_name)
self._ack = True
self.async_write_ha_state()
async def async_toggle(self, **kwargs: Any) -> None:
"""Async toggle alert."""
if self._ack:
return await self.async_turn_on()
return await self.async_turn_off()

View File

@ -337,7 +337,7 @@ async def test_skipfirst(hass: HomeAssistant, mock_notifier: list[ServiceCall])
async def test_done_message_state_tracker_reset_on_cancel(hass: HomeAssistant) -> None:
"""Test that the done message is reset when canceled."""
entity = alert.Alert(hass, *TEST_NOACK)
entity = alert.AlertEntity(hass, *TEST_NOACK)
entity._cancel = lambda *args: None
assert entity._send_done_message is False
entity._send_done_message = True