mirror of
https://github.com/home-assistant/core.git
synced 2025-07-09 22:37:11 +00:00
Convert amcrest binary sensors from poll to stream (#32818)
* Convert amcrest binary sensors from poll to stream - Bump amcrest package to 1.6.0. - For online binary sensor poll camera periodically to test communications in case configuration & usage results in no other communication to camera. - Start a separate thread to call camera's event_stream method since it never returns. - Convert all received events into signals that cause corresponding sensors to update. - Use camera's generic event_channels_happened method to update sensors at startup, and whenever camera comes back online after being unavailable. * Changes per review * Changes per review 2 * Changes per review 3 - Move event stream decoding to amcrest package. - Change name of event processing threads so global counter is no longer required. - Bump amcrest package to 1.7.0.
This commit is contained in:
parent
d8e3e9abaa
commit
0ed7bc3b8e
@ -42,6 +42,8 @@ from .const import (
|
|||||||
DATA_AMCREST,
|
DATA_AMCREST,
|
||||||
DEVICES,
|
DEVICES,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
SENSOR_EVENT_CODE,
|
||||||
|
SERVICE_EVENT,
|
||||||
SERVICE_UPDATE,
|
SERVICE_UPDATE,
|
||||||
)
|
)
|
||||||
from .helpers import service_signal
|
from .helpers import service_signal
|
||||||
@ -96,9 +98,11 @@ AMCREST_SCHEMA = vol.Schema(
|
|||||||
vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
|
vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
|
||||||
vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period,
|
vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period,
|
||||||
vol.Optional(CONF_BINARY_SENSORS): vol.All(
|
vol.Optional(CONF_BINARY_SENSORS): vol.All(
|
||||||
cv.ensure_list, [vol.In(BINARY_SENSORS)]
|
cv.ensure_list, [vol.In(BINARY_SENSORS)], vol.Unique()
|
||||||
|
),
|
||||||
|
vol.Optional(CONF_SENSORS): vol.All(
|
||||||
|
cv.ensure_list, [vol.In(SENSORS)], vol.Unique()
|
||||||
),
|
),
|
||||||
vol.Optional(CONF_SENSORS): vol.All(cv.ensure_list, [vol.In(SENSORS)]),
|
|
||||||
vol.Optional(CONF_CONTROL_LIGHT, default=True): cv.boolean,
|
vol.Optional(CONF_CONTROL_LIGHT, default=True): cv.boolean,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@ -119,6 +123,8 @@ class AmcrestChecker(Http):
|
|||||||
self._wrap_errors = 0
|
self._wrap_errors = 0
|
||||||
self._wrap_lock = threading.Lock()
|
self._wrap_lock = threading.Lock()
|
||||||
self._wrap_login_err = False
|
self._wrap_login_err = False
|
||||||
|
self._wrap_event_flag = threading.Event()
|
||||||
|
self._wrap_event_flag.set()
|
||||||
self._unsub_recheck = None
|
self._unsub_recheck = None
|
||||||
super().__init__(
|
super().__init__(
|
||||||
host,
|
host,
|
||||||
@ -134,16 +140,22 @@ class AmcrestChecker(Http):
|
|||||||
"""Return if camera's API is responding."""
|
"""Return if camera's API is responding."""
|
||||||
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
|
return self._wrap_errors <= MAX_ERRORS and not self._wrap_login_err
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available_flag(self):
|
||||||
|
"""Return threading event flag that indicates if camera's API is responding."""
|
||||||
|
return self._wrap_event_flag
|
||||||
|
|
||||||
def _start_recovery(self):
|
def _start_recovery(self):
|
||||||
|
self._wrap_event_flag.clear()
|
||||||
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
|
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
|
||||||
self._unsub_recheck = track_time_interval(
|
self._unsub_recheck = track_time_interval(
|
||||||
self._hass, self._wrap_test_online, RECHECK_INTERVAL
|
self._hass, self._wrap_test_online, RECHECK_INTERVAL
|
||||||
)
|
)
|
||||||
|
|
||||||
def command(self, cmd, retries=None, timeout_cmd=None, stream=False):
|
def command(self, *args, **kwargs):
|
||||||
"""amcrest.Http.command wrapper to catch errors."""
|
"""amcrest.Http.command wrapper to catch errors."""
|
||||||
try:
|
try:
|
||||||
ret = super().command(cmd, retries, timeout_cmd, stream)
|
ret = super().command(*args, **kwargs)
|
||||||
except LoginError as ex:
|
except LoginError as ex:
|
||||||
with self._wrap_lock:
|
with self._wrap_lock:
|
||||||
was_online = self.available
|
was_online = self.available
|
||||||
@ -172,6 +184,7 @@ class AmcrestChecker(Http):
|
|||||||
self._unsub_recheck()
|
self._unsub_recheck()
|
||||||
self._unsub_recheck = None
|
self._unsub_recheck = None
|
||||||
_LOGGER.error("%s camera back online", self._wrap_name)
|
_LOGGER.error("%s camera back online", self._wrap_name)
|
||||||
|
self._wrap_event_flag.set()
|
||||||
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
|
dispatcher_send(self._hass, service_signal(SERVICE_UPDATE, self._wrap_name))
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -184,6 +197,31 @@ class AmcrestChecker(Http):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _monitor_events(hass, name, api, event_codes):
|
||||||
|
event_codes = ",".join(event_codes)
|
||||||
|
while True:
|
||||||
|
api.available_flag.wait()
|
||||||
|
try:
|
||||||
|
for code, start in api.event_actions(event_codes, retries=5):
|
||||||
|
signal = service_signal(SERVICE_EVENT, name, code)
|
||||||
|
_LOGGER.debug("Sending signal: '%s': %s", signal, start)
|
||||||
|
dispatcher_send(hass, signal, start)
|
||||||
|
except AmcrestError as error:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Error while processing events from %s camera: %r", name, error
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _start_event_monitor(hass, name, api, event_codes):
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=_monitor_events,
|
||||||
|
name=f"Amcrest {name}",
|
||||||
|
args=(hass, name, api, event_codes),
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
def setup(hass, config):
|
def setup(hass, config):
|
||||||
"""Set up the Amcrest IP Camera component."""
|
"""Set up the Amcrest IP Camera component."""
|
||||||
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
|
hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
|
||||||
@ -230,6 +268,13 @@ def setup(hass, config):
|
|||||||
{CONF_NAME: name, CONF_BINARY_SENSORS: binary_sensors},
|
{CONF_NAME: name, CONF_BINARY_SENSORS: binary_sensors},
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
|
event_codes = [
|
||||||
|
BINARY_SENSORS[sensor_type][SENSOR_EVENT_CODE]
|
||||||
|
for sensor_type in binary_sensors
|
||||||
|
if BINARY_SENSORS[sensor_type][SENSOR_EVENT_CODE] is not None
|
||||||
|
]
|
||||||
|
if event_codes:
|
||||||
|
_start_event_monitor(hass, name, api, event_codes)
|
||||||
|
|
||||||
if sensors:
|
if sensors:
|
||||||
discovery.load_platform(
|
discovery.load_platform(
|
||||||
|
@ -10,12 +10,17 @@ from homeassistant.components.binary_sensor import (
|
|||||||
BinarySensorDevice,
|
BinarySensorDevice,
|
||||||
)
|
)
|
||||||
from homeassistant.const import CONF_BINARY_SENSORS, CONF_NAME
|
from homeassistant.const import CONF_BINARY_SENSORS, CONF_NAME
|
||||||
|
from homeassistant.core import callback
|
||||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
BINARY_SENSOR_SCAN_INTERVAL_SECS,
|
BINARY_SENSOR_SCAN_INTERVAL_SECS,
|
||||||
DATA_AMCREST,
|
DATA_AMCREST,
|
||||||
DEVICES,
|
DEVICES,
|
||||||
|
SENSOR_DEVICE_CLASS,
|
||||||
|
SENSOR_EVENT_CODE,
|
||||||
|
SENSOR_NAME,
|
||||||
|
SERVICE_EVENT,
|
||||||
SERVICE_UPDATE,
|
SERVICE_UPDATE,
|
||||||
)
|
)
|
||||||
from .helpers import log_update_error, service_signal
|
from .helpers import log_update_error, service_signal
|
||||||
@ -26,11 +31,20 @@ SCAN_INTERVAL = timedelta(seconds=BINARY_SENSOR_SCAN_INTERVAL_SECS)
|
|||||||
|
|
||||||
BINARY_SENSOR_MOTION_DETECTED = "motion_detected"
|
BINARY_SENSOR_MOTION_DETECTED = "motion_detected"
|
||||||
BINARY_SENSOR_ONLINE = "online"
|
BINARY_SENSOR_ONLINE = "online"
|
||||||
# Binary sensor types are defined like: Name, device class
|
|
||||||
BINARY_SENSORS = {
|
BINARY_SENSORS = {
|
||||||
BINARY_SENSOR_MOTION_DETECTED: ("Motion Detected", DEVICE_CLASS_MOTION),
|
BINARY_SENSOR_MOTION_DETECTED: (
|
||||||
BINARY_SENSOR_ONLINE: ("Online", DEVICE_CLASS_CONNECTIVITY),
|
"Motion Detected",
|
||||||
|
DEVICE_CLASS_MOTION,
|
||||||
|
"VideoMotion",
|
||||||
|
),
|
||||||
|
BINARY_SENSOR_ONLINE: ("Online", DEVICE_CLASS_CONNECTIVITY, None),
|
||||||
}
|
}
|
||||||
|
BINARY_SENSORS = {
|
||||||
|
k: dict(zip((SENSOR_NAME, SENSOR_DEVICE_CLASS, SENSOR_EVENT_CODE), v))
|
||||||
|
for k, v in BINARY_SENSORS.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
_UPDATE_MSG = "Updating %s binary sensor"
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
|
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
|
||||||
@ -54,18 +68,19 @@ class AmcrestBinarySensor(BinarySensorDevice):
|
|||||||
|
|
||||||
def __init__(self, name, device, sensor_type):
|
def __init__(self, name, device, sensor_type):
|
||||||
"""Initialize entity."""
|
"""Initialize entity."""
|
||||||
self._name = f"{name} {BINARY_SENSORS[sensor_type][0]}"
|
self._name = f"{name} {BINARY_SENSORS[sensor_type][SENSOR_NAME]}"
|
||||||
self._signal_name = name
|
self._signal_name = name
|
||||||
self._api = device.api
|
self._api = device.api
|
||||||
self._sensor_type = sensor_type
|
self._sensor_type = sensor_type
|
||||||
self._state = None
|
self._state = None
|
||||||
self._device_class = BINARY_SENSORS[sensor_type][1]
|
self._device_class = BINARY_SENSORS[sensor_type][SENSOR_DEVICE_CLASS]
|
||||||
self._unsub_dispatcher = None
|
self._event_code = BINARY_SENSORS[sensor_type][SENSOR_EVENT_CODE]
|
||||||
|
self._unsub_dispatcher = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def should_poll(self):
|
def should_poll(self):
|
||||||
"""Return True if entity has to be polled for state."""
|
"""Return True if entity has to be polled for state."""
|
||||||
return self._sensor_type != BINARY_SENSOR_ONLINE
|
return self._sensor_type == BINARY_SENSOR_ONLINE
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self):
|
||||||
@ -89,16 +104,34 @@ class AmcrestBinarySensor(BinarySensorDevice):
|
|||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
"""Update entity."""
|
"""Update entity."""
|
||||||
|
if self._sensor_type == BINARY_SENSOR_ONLINE:
|
||||||
|
self._update_online()
|
||||||
|
else:
|
||||||
|
self._update_others()
|
||||||
|
|
||||||
|
def _update_online(self):
|
||||||
|
if not (self._api.available or self.is_on):
|
||||||
|
return
|
||||||
|
_LOGGER.debug(_UPDATE_MSG, self._name)
|
||||||
|
if self._api.available:
|
||||||
|
# Send a command to the camera to test if we can still communicate with it.
|
||||||
|
# Override of Http.command() in __init__.py will set self._api.available
|
||||||
|
# accordingly.
|
||||||
|
try:
|
||||||
|
self._api.current_time
|
||||||
|
except AmcrestError:
|
||||||
|
pass
|
||||||
|
self._state = self._api.available
|
||||||
|
|
||||||
|
def _update_others(self):
|
||||||
if not self.available:
|
if not self.available:
|
||||||
return
|
return
|
||||||
_LOGGER.debug("Updating %s binary sensor", self._name)
|
_LOGGER.debug(_UPDATE_MSG, self._name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if self._sensor_type == BINARY_SENSOR_MOTION_DETECTED:
|
self._state = "channels" in self._api.event_channels_happened(
|
||||||
self._state = self._api.is_motion_detected
|
self._event_code
|
||||||
|
)
|
||||||
elif self._sensor_type == BINARY_SENSOR_ONLINE:
|
|
||||||
self._state = self._api.available
|
|
||||||
except AmcrestError as error:
|
except AmcrestError as error:
|
||||||
log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
|
log_update_error(_LOGGER, "update", self.name, "binary sensor", error)
|
||||||
|
|
||||||
@ -106,14 +139,32 @@ class AmcrestBinarySensor(BinarySensorDevice):
|
|||||||
"""Update state."""
|
"""Update state."""
|
||||||
self.async_schedule_update_ha_state(True)
|
self.async_schedule_update_ha_state(True)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def async_event_received(self, start):
|
||||||
|
"""Update state from received event."""
|
||||||
|
_LOGGER.debug(_UPDATE_MSG, self._name)
|
||||||
|
self._state = start
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
async def async_added_to_hass(self):
|
async def async_added_to_hass(self):
|
||||||
"""Subscribe to update signal."""
|
"""Subscribe to signals."""
|
||||||
self._unsub_dispatcher = async_dispatcher_connect(
|
self._unsub_dispatcher.append(
|
||||||
self.hass,
|
async_dispatcher_connect(
|
||||||
service_signal(SERVICE_UPDATE, self._signal_name),
|
self.hass,
|
||||||
self.async_on_demand_update,
|
service_signal(SERVICE_UPDATE, self._signal_name),
|
||||||
|
self.async_on_demand_update,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
if self._event_code:
|
||||||
|
self._unsub_dispatcher.append(
|
||||||
|
async_dispatcher_connect(
|
||||||
|
self.hass,
|
||||||
|
service_signal(SERVICE_EVENT, self._signal_name, self._event_code),
|
||||||
|
self.async_event_received,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
async def async_will_remove_from_hass(self):
|
async def async_will_remove_from_hass(self):
|
||||||
"""Disconnect from update signal."""
|
"""Disconnect from update signal."""
|
||||||
self._unsub_dispatcher()
|
for unsub_dispatcher in self._unsub_dispatcher:
|
||||||
|
unsub_dispatcher()
|
||||||
|
@ -4,11 +4,16 @@ DATA_AMCREST = DOMAIN
|
|||||||
CAMERAS = "cameras"
|
CAMERAS = "cameras"
|
||||||
DEVICES = "devices"
|
DEVICES = "devices"
|
||||||
|
|
||||||
BINARY_SENSOR_SCAN_INTERVAL_SECS = 5
|
BINARY_SENSOR_SCAN_INTERVAL_SECS = 60
|
||||||
CAMERA_WEB_SESSION_TIMEOUT = 10
|
CAMERA_WEB_SESSION_TIMEOUT = 10
|
||||||
COMM_RETRIES = 1
|
COMM_RETRIES = 1
|
||||||
COMM_TIMEOUT = 6.05
|
COMM_TIMEOUT = 6.05
|
||||||
SENSOR_SCAN_INTERVAL_SECS = 10
|
SENSOR_SCAN_INTERVAL_SECS = 10
|
||||||
SNAPSHOT_TIMEOUT = 20
|
SNAPSHOT_TIMEOUT = 20
|
||||||
|
|
||||||
|
SERVICE_EVENT = "event"
|
||||||
SERVICE_UPDATE = "update"
|
SERVICE_UPDATE = "update"
|
||||||
|
|
||||||
|
SENSOR_DEVICE_CLASS = "class"
|
||||||
|
SENSOR_EVENT_CODE = "code"
|
||||||
|
SENSOR_NAME = "name"
|
||||||
|
@ -2,12 +2,9 @@
|
|||||||
from .const import DOMAIN
|
from .const import DOMAIN
|
||||||
|
|
||||||
|
|
||||||
def service_signal(service, ident=None):
|
def service_signal(service, *args):
|
||||||
"""Encode service and identifier into signal."""
|
"""Encode signal."""
|
||||||
signal = f"{DOMAIN}_{service}"
|
return "_".join([DOMAIN, service, *args])
|
||||||
if ident:
|
|
||||||
signal += f"_{ident.replace('.', '_')}"
|
|
||||||
return signal
|
|
||||||
|
|
||||||
|
|
||||||
def log_update_error(logger, action, name, entity_type, error):
|
def log_update_error(logger, action, name, entity_type, error):
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
"domain": "amcrest",
|
"domain": "amcrest",
|
||||||
"name": "Amcrest",
|
"name": "Amcrest",
|
||||||
"documentation": "https://www.home-assistant.io/integrations/amcrest",
|
"documentation": "https://www.home-assistant.io/integrations/amcrest",
|
||||||
"requirements": ["amcrest==1.5.6"],
|
"requirements": ["amcrest==1.7.0"],
|
||||||
"dependencies": ["ffmpeg"],
|
"dependencies": ["ffmpeg"],
|
||||||
"codeowners": ["@pnbruckner"]
|
"codeowners": ["@pnbruckner"]
|
||||||
}
|
}
|
||||||
|
@ -224,7 +224,7 @@ alpha_vantage==2.1.3
|
|||||||
ambiclimate==0.2.1
|
ambiclimate==0.2.1
|
||||||
|
|
||||||
# homeassistant.components.amcrest
|
# homeassistant.components.amcrest
|
||||||
amcrest==1.5.6
|
amcrest==1.7.0
|
||||||
|
|
||||||
# homeassistant.components.androidtv
|
# homeassistant.components.androidtv
|
||||||
androidtv==0.0.39
|
androidtv==0.0.39
|
||||||
|
Loading…
x
Reference in New Issue
Block a user