Add onvif parser support for reolink package and hikvision alarm (#140669)

This commit is contained in:
Jeff Terrace 2025-03-15 15:03:40 -04:00 committed by GitHub
parent c1c8deed0c
commit 02a75edf1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 113 additions and 0 deletions

View File

@ -49,6 +49,7 @@ def local_datetime_or_none(value: str) -> datetime.datetime | None:
@PARSERS.register("tns1:VideoSource/MotionAlarm")
@PARSERS.register("tns1:Device/Trigger/tnshik:AlarmIn")
async def async_parse_motion_alarm(uid: str, msg) -> Event | None:
"""Handle parsing event message.
@ -475,6 +476,28 @@ async def async_parse_visitor_detector(uid: str, msg) -> Event | None:
)
@PARSERS.register("tns1:RuleEngine/MyRuleDetector/Package")
async def async_parse_package_detector(uid: str, msg) -> Event | None:
"""Handle parsing event message.
Topic: tns1:RuleEngine/MyRuleDetector/Package
"""
video_source = ""
topic, payload = extract_message(msg)
for source in payload.Source.SimpleItem:
if source.Name == "Source":
video_source = _normalize_video_source(source.Value)
return Event(
f"{uid}_{topic}_{video_source}",
"Package Detection",
"binary_sensor",
"occupancy",
None,
payload.Data.SimpleItem[0].Value == "true",
)
@PARSERS.register("tns1:Device/Trigger/DigitalInput")
async def async_parse_digital_input(uid: str, msg) -> Event | None:
"""Handle parsing event message.

View File

@ -789,3 +789,93 @@ async def test_tapo_unknown_type(hass: HomeAssistant) -> None:
)
assert event is None
async def test_reolink_package(hass: HomeAssistant) -> None:
"""Tests reolink package event."""
event = await get_event(
{
"SubscriptionReference": None,
"Topic": {
"_value_1": "tns1:RuleEngine/MyRuleDetector/Package",
"Dialect": "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet",
"_attr_1": {},
},
"ProducerReference": None,
"Message": {
"_value_1": {
"Source": {
"SimpleItem": [{"Name": "Source", "Value": "000"}],
"ElementItem": [],
"Extension": None,
"_attr_1": None,
},
"Key": None,
"Data": {
"SimpleItem": [{"Name": "State", "Value": "true"}],
"ElementItem": [],
"Extension": None,
"_attr_1": None,
},
"Extension": None,
"UtcTime": datetime.datetime(
2025, 3, 12, 9, 54, 27, tzinfo=datetime.UTC
),
"PropertyOperation": "Initialized",
"_attr_1": {},
}
},
}
)
assert event is not None
assert event.name == "Package Detection"
assert event.platform == "binary_sensor"
assert event.device_class == "occupancy"
assert event.value
assert event.uid == (f"{TEST_UID}_tns1:RuleEngine/MyRuleDetector/Package_000")
async def test_hikvision_alarm(hass: HomeAssistant) -> None:
"""Tests hikvision camera alarm event."""
event = await get_event(
{
"SubscriptionReference": None,
"Topic": {
"_value_1": "tns1:Device/Trigger/tnshik:AlarmIn",
"Dialect": "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet",
"_attr_1": {},
},
"ProducerReference": None,
"Message": {
"_value_1": {
"Source": {
"SimpleItem": [{"Name": "AlarmInToken", "Value": "AlarmIn_1"}],
"ElementItem": [],
"Extension": None,
"_attr_1": None,
},
"Key": None,
"Data": {
"SimpleItem": [{"Name": "State", "Value": "true"}],
"ElementItem": [],
"Extension": None,
"_attr_1": None,
},
"Extension": None,
"UtcTime": datetime.datetime(
2025, 3, 13, 22, 57, 26, tzinfo=datetime.UTC
),
"PropertyOperation": "Initialized",
"_attr_1": {},
}
},
}
)
assert event is not None
assert event.name == "Motion Alarm"
assert event.platform == "binary_sensor"
assert event.device_class == "motion"
assert event.value
assert event.uid == (f"{TEST_UID}_tns1:Device/Trigger/tnshik:AlarmIn_AlarmIn_1")