Update amcrest to use binary sensor entity description (#55092)

This commit is contained in:
Sean Vig 2021-08-24 08:06:19 -04:00 committed by GitHub
parent 828d862339
commit 5cbb217318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 88 deletions

View File

@ -34,7 +34,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_s
from homeassistant.helpers.event import track_time_interval from homeassistant.helpers.event import track_time_interval
from homeassistant.helpers.service import async_extract_entity_ids from homeassistant.helpers.service import async_extract_entity_ids
from .binary_sensor import BINARY_POLLED_SENSORS, BINARY_SENSORS, check_binary_sensors from .binary_sensor import BINARY_SENSOR_KEYS, BINARY_SENSORS, check_binary_sensors
from .camera import CAMERA_SERVICES, STREAM_SOURCE_LIST from .camera import CAMERA_SERVICES, STREAM_SOURCE_LIST
from .const import ( from .const import (
CAMERAS, CAMERAS,
@ -43,7 +43,6 @@ from .const import (
DATA_AMCREST, DATA_AMCREST,
DEVICES, DEVICES,
DOMAIN, DOMAIN,
SENSOR_EVENT_CODE,
SERVICE_EVENT, SERVICE_EVENT,
SERVICE_UPDATE, SERVICE_UPDATE,
) )
@ -99,7 +98,10 @@ AMCREST_SCHEMA = vol.Schema(
vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string, vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period, vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period,
vol.Optional(CONF_BINARY_SENSORS): vol.All( vol.Optional(CONF_BINARY_SENSORS): vol.All(
cv.ensure_list, [vol.In(BINARY_SENSORS)], vol.Unique(), check_binary_sensors cv.ensure_list,
[vol.In(BINARY_SENSOR_KEYS)],
vol.Unique(),
check_binary_sensors,
), ),
vol.Optional(CONF_SENSORS): vol.All( vol.Optional(CONF_SENSORS): vol.All(
cv.ensure_list, [vol.In(SENSOR_KEYS)], vol.Unique() cv.ensure_list, [vol.In(SENSOR_KEYS)], vol.Unique()
@ -276,9 +278,9 @@ def setup(hass, config):
config, config,
) )
event_codes = [ event_codes = [
BINARY_SENSORS[sensor_type][SENSOR_EVENT_CODE] sensor.event_code
for sensor_type in binary_sensors for sensor in BINARY_SENSORS
if sensor_type not in BINARY_POLLED_SENSORS if sensor.key in binary_sensors and not sensor.should_poll
] ]
_start_event_monitor(hass, name, api, event_codes) _start_event_monitor(hass, name, api, event_codes)

View File

@ -1,5 +1,8 @@
"""Support for Amcrest IP camera binary sensors.""" """Support for Amcrest IP camera binary sensors."""
from __future__ import annotations
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
import logging import logging
@ -11,6 +14,7 @@ from homeassistant.components.binary_sensor import (
DEVICE_CLASS_MOTION, DEVICE_CLASS_MOTION,
DEVICE_CLASS_SOUND, DEVICE_CLASS_SOUND,
BinarySensorEntity, BinarySensorEntity,
BinarySensorEntityDescription,
) )
from homeassistant.const import CONF_BINARY_SENSORS, CONF_NAME from homeassistant.const import CONF_BINARY_SENSORS, CONF_NAME
from homeassistant.core import callback from homeassistant.core import callback
@ -21,54 +25,93 @@ from .const import (
BINARY_SENSOR_SCAN_INTERVAL_SECS, BINARY_SENSOR_SCAN_INTERVAL_SECS,
DATA_AMCREST, DATA_AMCREST,
DEVICES, DEVICES,
SENSOR_DEVICE_CLASS,
SENSOR_EVENT_CODE,
SENSOR_NAME,
SERVICE_EVENT, SERVICE_EVENT,
SERVICE_UPDATE, SERVICE_UPDATE,
) )
from .helpers import log_update_error, service_signal from .helpers import log_update_error, service_signal
@dataclass
class AmcrestSensorEntityDescription(BinarySensorEntityDescription):
"""Describe Amcrest sensor entity."""
event_code: str | None = None
should_poll: bool = False
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=BINARY_SENSOR_SCAN_INTERVAL_SECS) SCAN_INTERVAL = timedelta(seconds=BINARY_SENSOR_SCAN_INTERVAL_SECS)
_ONLINE_SCAN_INTERVAL = timedelta(seconds=60 - BINARY_SENSOR_SCAN_INTERVAL_SECS) _ONLINE_SCAN_INTERVAL = timedelta(seconds=60 - BINARY_SENSOR_SCAN_INTERVAL_SECS)
BINARY_SENSOR_AUDIO_DETECTED = "audio_detected" _AUDIO_DETECTED_KEY = "audio_detected"
BINARY_SENSOR_AUDIO_DETECTED_POLLED = "audio_detected_polled" _AUDIO_DETECTED_POLLED_KEY = "audio_detected_polled"
BINARY_SENSOR_MOTION_DETECTED = "motion_detected" _AUDIO_DETECTED_NAME = "Audio Detected"
BINARY_SENSOR_MOTION_DETECTED_POLLED = "motion_detected_polled" _AUDIO_DETECTED_EVENT_CODE = "AudioMutation"
BINARY_SENSOR_ONLINE = "online"
BINARY_SENSOR_CROSSLINE_DETECTED = "crossline_detected" _CROSSLINE_DETECTED_KEY = "crossline_detected"
BINARY_SENSOR_CROSSLINE_DETECTED_POLLED = "crossline_detected_polled" _CROSSLINE_DETECTED_POLLED_KEY = "crossline_detected_polled"
BINARY_POLLED_SENSORS = [ _CROSSLINE_DETECTED_NAME = "CrossLine Detected"
BINARY_SENSOR_AUDIO_DETECTED_POLLED, _CROSSLINE_DETECTED_EVENT_CODE = "CrossLineDetection"
BINARY_SENSOR_MOTION_DETECTED_POLLED,
BINARY_SENSOR_ONLINE, _MOTION_DETECTED_KEY = "motion_detected"
] _MOTION_DETECTED_POLLED_KEY = "motion_detected_polled"
_AUDIO_DETECTED_PARAMS = ("Audio Detected", DEVICE_CLASS_SOUND, "AudioMutation") _MOTION_DETECTED_NAME = "Motion Detected"
_MOTION_DETECTED_PARAMS = ("Motion Detected", DEVICE_CLASS_MOTION, "VideoMotion") _MOTION_DETECTED_EVENT_CODE = "VideoMotion"
_CROSSLINE_DETECTED_PARAMS = (
"CrossLine Detected", _ONLINE_KEY = "online"
DEVICE_CLASS_MOTION,
"CrossLineDetection", BINARY_SENSORS: tuple[AmcrestSensorEntityDescription, ...] = (
AmcrestSensorEntityDescription(
key=_AUDIO_DETECTED_KEY,
name=_AUDIO_DETECTED_NAME,
device_class=DEVICE_CLASS_SOUND,
event_code=_AUDIO_DETECTED_EVENT_CODE,
),
AmcrestSensorEntityDescription(
key=_AUDIO_DETECTED_POLLED_KEY,
name=_AUDIO_DETECTED_NAME,
device_class=DEVICE_CLASS_SOUND,
event_code=_AUDIO_DETECTED_EVENT_CODE,
should_poll=True,
),
AmcrestSensorEntityDescription(
key=_CROSSLINE_DETECTED_KEY,
name=_CROSSLINE_DETECTED_NAME,
device_class=DEVICE_CLASS_MOTION,
event_code=_CROSSLINE_DETECTED_EVENT_CODE,
),
AmcrestSensorEntityDescription(
key=_CROSSLINE_DETECTED_POLLED_KEY,
name=_CROSSLINE_DETECTED_NAME,
device_class=DEVICE_CLASS_MOTION,
event_code=_CROSSLINE_DETECTED_EVENT_CODE,
should_poll=True,
),
AmcrestSensorEntityDescription(
key=_MOTION_DETECTED_KEY,
name=_MOTION_DETECTED_NAME,
device_class=DEVICE_CLASS_MOTION,
event_code=_MOTION_DETECTED_EVENT_CODE,
),
AmcrestSensorEntityDescription(
key=_MOTION_DETECTED_POLLED_KEY,
name=_MOTION_DETECTED_NAME,
device_class=DEVICE_CLASS_MOTION,
event_code=_MOTION_DETECTED_EVENT_CODE,
should_poll=True,
),
AmcrestSensorEntityDescription(
key=_ONLINE_KEY,
name="Online",
device_class=DEVICE_CLASS_CONNECTIVITY,
),
) )
RAW_BINARY_SENSORS = { BINARY_SENSOR_KEYS = [description.key for description in BINARY_SENSORS]
BINARY_SENSOR_AUDIO_DETECTED: _AUDIO_DETECTED_PARAMS,
BINARY_SENSOR_AUDIO_DETECTED_POLLED: _AUDIO_DETECTED_PARAMS,
BINARY_SENSOR_MOTION_DETECTED: _MOTION_DETECTED_PARAMS,
BINARY_SENSOR_MOTION_DETECTED_POLLED: _MOTION_DETECTED_PARAMS,
BINARY_SENSOR_CROSSLINE_DETECTED: _CROSSLINE_DETECTED_PARAMS,
BINARY_SENSOR_CROSSLINE_DETECTED_POLLED: _CROSSLINE_DETECTED_PARAMS,
BINARY_SENSOR_ONLINE: ("Online", DEVICE_CLASS_CONNECTIVITY, None),
}
BINARY_SENSORS = {
k: dict(zip((SENSOR_NAME, SENSOR_DEVICE_CLASS, SENSOR_EVENT_CODE), v))
for k, v in RAW_BINARY_SENSORS.items()
}
_EXCLUSIVE_OPTIONS = [ _EXCLUSIVE_OPTIONS = [
{BINARY_SENSOR_MOTION_DETECTED, BINARY_SENSOR_MOTION_DETECTED_POLLED}, {_AUDIO_DETECTED_KEY, _AUDIO_DETECTED_POLLED_KEY},
{BINARY_SENSOR_CROSSLINE_DETECTED, BINARY_SENSOR_CROSSLINE_DETECTED_POLLED}, {_MOTION_DETECTED_KEY, _MOTION_DETECTED_POLLED_KEY},
{_CROSSLINE_DETECTED_KEY, _CROSSLINE_DETECTED_POLLED_KEY},
] ]
_UPDATE_MSG = "Updating %s binary sensor" _UPDATE_MSG = "Updating %s binary sensor"
@ -91,10 +134,12 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
name = discovery_info[CONF_NAME] name = discovery_info[CONF_NAME]
device = hass.data[DATA_AMCREST][DEVICES][name] device = hass.data[DATA_AMCREST][DEVICES][name]
binary_sensors = discovery_info[CONF_BINARY_SENSORS]
async_add_entities( async_add_entities(
[ [
AmcrestBinarySensor(name, device, sensor_type) AmcrestBinarySensor(name, device, entity_description)
for sensor_type in discovery_info[CONF_BINARY_SENSORS] for entity_description in BINARY_SENSORS
if entity_description.key in binary_sensors
], ],
True, True,
) )
@ -103,45 +148,23 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
class AmcrestBinarySensor(BinarySensorEntity): class AmcrestBinarySensor(BinarySensorEntity):
"""Binary sensor for Amcrest camera.""" """Binary sensor for Amcrest camera."""
def __init__(self, name, device, sensor_type): def __init__(self, name, device, entity_description):
"""Initialize entity.""" """Initialize entity."""
self._name = f"{name} {BINARY_SENSORS[sensor_type][SENSOR_NAME]}"
self._signal_name = name self._signal_name = name
self._api = device.api self._api = device.api
self._sensor_type = sensor_type self.entity_description = entity_description
self._state = None self._attr_name = f"{name} {entity_description.name}"
self._device_class = BINARY_SENSORS[sensor_type][SENSOR_DEVICE_CLASS] self._attr_should_poll = entity_description.should_poll
self._event_code = BINARY_SENSORS[sensor_type][SENSOR_EVENT_CODE]
self._unsub_dispatcher = [] self._unsub_dispatcher = []
@property
def should_poll(self):
"""Return True if entity has to be polled for state."""
return self._sensor_type in BINARY_POLLED_SENSORS
@property
def name(self):
"""Return entity name."""
return self._name
@property
def is_on(self):
"""Return if entity is on."""
return self._state
@property
def device_class(self):
"""Return device class."""
return self._device_class
@property @property
def available(self): def available(self):
"""Return True if entity is available.""" """Return True if entity is available."""
return self._sensor_type == BINARY_SENSOR_ONLINE or self._api.available return self.entity_description.key == _ONLINE_KEY or self._api.available
def update(self): def update(self):
"""Update entity.""" """Update entity."""
if self._sensor_type == BINARY_SENSOR_ONLINE: if self.entity_description.key == _ONLINE_KEY:
self._update_online() self._update_online()
else: else:
self._update_others() self._update_others()
@ -150,32 +173,33 @@ class AmcrestBinarySensor(BinarySensorEntity):
def _update_online(self): def _update_online(self):
if not (self._api.available or self.is_on): if not (self._api.available or self.is_on):
return return
_LOGGER.debug(_UPDATE_MSG, self._name) _LOGGER.debug(_UPDATE_MSG, self.name)
if self._api.available: if self._api.available:
# Send a command to the camera to test if we can still communicate with it. # Send a command to the camera to test if we can still communicate with it.
# Override of Http.command() in __init__.py will set self._api.available # Override of Http.command() in __init__.py will set self._api.available
# accordingly. # accordingly.
with suppress(AmcrestError): with suppress(AmcrestError):
self._api.current_time # pylint: disable=pointless-statement self._api.current_time # pylint: disable=pointless-statement
self._state = self._api.available self._attr_is_on = self._api.available
def _update_others(self): def _update_others(self):
if not self.available: if not self.available:
return return
_LOGGER.debug(_UPDATE_MSG, self._name) _LOGGER.debug(_UPDATE_MSG, self.name)
event_code = self.entity_description.event_code
try: try:
self._state = "channels" in self._api.event_channels_happened( self._attr_is_on = "channels" in self._api.event_channels_happened(
self._event_code event_code
) )
except AmcrestError as error: except AmcrestError as error:
log_update_error(_LOGGER, "update", self.name, "binary sensor", error) log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
async def async_on_demand_update(self): async def async_on_demand_update(self):
"""Update state.""" """Update state."""
if self._sensor_type == BINARY_SENSOR_ONLINE: if self.entity_description.key == _ONLINE_KEY:
_LOGGER.debug(_UPDATE_MSG, self._name) _LOGGER.debug(_UPDATE_MSG, self.name)
self._state = self._api.available self._attr_is_on = self._api.available
self.async_write_ha_state() self.async_write_ha_state()
return return
self.async_schedule_update_ha_state(True) self.async_schedule_update_ha_state(True)
@ -183,8 +207,8 @@ class AmcrestBinarySensor(BinarySensorEntity):
@callback @callback
def async_event_received(self, start): def async_event_received(self, start):
"""Update state from received event.""" """Update state from received event."""
_LOGGER.debug(_UPDATE_MSG, self._name) _LOGGER.debug(_UPDATE_MSG, self.name)
self._state = start self._attr_is_on = start
self.async_write_ha_state() self.async_write_ha_state()
async def async_added_to_hass(self): async def async_added_to_hass(self):
@ -196,11 +220,18 @@ class AmcrestBinarySensor(BinarySensorEntity):
self.async_on_demand_update, self.async_on_demand_update,
) )
) )
if self._event_code and self._sensor_type not in BINARY_POLLED_SENSORS: if (
self.entity_description.event_code
and not self.entity_description.should_poll
):
self._unsub_dispatcher.append( self._unsub_dispatcher.append(
async_dispatcher_connect( async_dispatcher_connect(
self.hass, self.hass,
service_signal(SERVICE_EVENT, self._signal_name, self._event_code), service_signal(
SERVICE_EVENT,
self._signal_name,
self.entity_description.event_code,
),
self.async_event_received, self.async_event_received,
) )
) )

View File

@ -13,7 +13,3 @@ SNAPSHOT_TIMEOUT = 20
SERVICE_EVENT = "event" SERVICE_EVENT = "event"
SERVICE_UPDATE = "update" SERVICE_UPDATE = "update"
SENSOR_DEVICE_CLASS = "class"
SENSOR_EVENT_CODE = "code"
SENSOR_NAME = "name"