Add Axis MQTT support (#36015)

* Working PoC

* Store

* Handle subscribing to MQTT and stopping stream when first telegram arrives

* Improve naming

* Now with test

* Better strings

* Fix Martins comments

* Improve mock device patching

* Bump dependency to v27
Add MQTT as after dependency
This commit is contained in:
Robert Svensson 2020-05-25 23:13:34 +02:00 committed by GitHub
parent db92ffdf89
commit 376e0e0e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 22 deletions

View File

@ -5,8 +5,12 @@ import asyncio
import async_timeout
import axis
from axis.event_stream import OPERATION_INITIALIZED
from axis.streammanager import SIGNAL_PLAYING
from axis.mqtt import mqtt_json_to_event
from axis.streammanager import SIGNAL_PLAYING, STATE_STOPPED
from homeassistant.components import mqtt
from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN
from homeassistant.components.mqtt.models import Message
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
@ -15,10 +19,11 @@ from homeassistant.const import (
CONF_TRIGGER_TIME,
CONF_USERNAME,
)
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_when_setup
from .const import (
ATTR_MANUFACTURER,
@ -122,6 +127,7 @@ class AxisNetworkDevice:
async def async_new_address_callback(hass, entry):
"""Handle signals of device getting new address.
Called when config entry is updated.
This is a static method because a class method (bound method),
can not be used with weak references.
"""
@ -142,6 +148,25 @@ class AxisNetworkDevice:
sw_version=self.fw_version,
)
async def use_mqtt(self, hass: HomeAssistant, component: str) -> None:
"""Set up to use MQTT."""
status = await hass.async_add_executor_job(
self.api.vapix.mqtt.get_client_status
)
if status.get("data", {}).get("status", {}).get("state") == "active":
self.listeners.append(
await mqtt.async_subscribe(hass, f"{self.serial}/#", self.mqtt_message)
)
@callback
def mqtt_message(self, message: Message) -> None:
"""Receive Axis MQTT message."""
self.disconnect_from_stream()
event = mqtt_json_to_event(message.payload)
self.api.event.process_event(event)
async def async_setup(self):
"""Set up the device."""
try:
@ -173,11 +198,14 @@ class AxisNetworkDevice:
]
)
if self.option_events:
self.api.stream.connection_status_callback = (
self.api.stream.connection_status_callback.append(
self.async_connection_status_callback
)
self.api.enable_events(event_callback=self.async_event_callback)
self.api.start()
self.api.stream.start()
if self.api.vapix.mqtt:
async_when_setup(self.hass, MQTT_DOMAIN, self.use_mqtt)
self.hass.async_create_task(start_platforms())
@ -185,14 +213,23 @@ class AxisNetworkDevice:
return True
@callback
def disconnect_from_stream(self):
"""Stop stream."""
if self.api.stream.state != STATE_STOPPED:
self.api.stream.connection_status_callback.remove(
self.async_connection_status_callback
)
self.api.stream.stop()
@callback
def shutdown(self, event):
"""Stop the event stream."""
self.api.stop()
self.disconnect_from_stream()
async def async_reset(self):
"""Reset this device to default state."""
self.api.stop()
self.disconnect_from_stream()
unload_ok = all(
await asyncio.gather(
@ -226,11 +263,13 @@ async def get_device(hass, host, port, username, password):
try:
with async_timeout.timeout(15):
await asyncio.gather(
hass.async_add_executor_job(device.vapix.params.update_brand),
hass.async_add_executor_job(device.vapix.params.update_properties),
hass.async_add_executor_job(device.vapix.ports.update),
)
for vapix_call in (
device.vapix.initialize_api_discovery,
device.vapix.params.update_brand,
device.vapix.params.update_properties,
device.vapix.ports.update,
):
await hass.async_add_executor_job(vapix_call)
return device

View File

@ -3,7 +3,8 @@
"name": "Axis",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/axis",
"requirements": ["axis==26"],
"requirements": ["axis==27"],
"zeroconf": ["_axis-video._tcp.local."],
"after_dependencies": ["mqtt"],
"codeowners": ["@Kane610"]
}

View File

@ -300,7 +300,7 @@ avea==1.4
avri-api==0.1.7
# homeassistant.components.axis
axis==26
axis==27
# homeassistant.components.azure_event_hub
azure-eventhub==1.3.1

View File

@ -141,7 +141,7 @@ async-upnp-client==0.14.13
av==8.0.1
# homeassistant.components.axis
axis==26
axis==27
# homeassistant.components.homekit
base36==0.1.1

View File

@ -1,5 +1,6 @@
"""Test Axis device."""
from copy import deepcopy
from unittest import mock
import axis as axislib
from axis.event_stream import OPERATION_INITIALIZED
@ -13,6 +14,7 @@ from homeassistant.components.axis.const import (
CONF_MODEL,
DOMAIN as AXIS_DOMAIN,
)
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
@ -23,7 +25,11 @@ from homeassistant.const import (
)
from tests.async_mock import Mock, patch
from tests.common import MockConfigEntry
from tests.common import (
MockConfigEntry,
async_fire_mqtt_message,
async_mock_mqtt_component,
)
MAC = "00408C12345"
MODEL = "model"
@ -41,6 +47,17 @@ ENTRY_CONFIG = {
CONF_NAME: NAME,
}
DEFAULT_API_DISCOVERY = {
"method": "getApiList",
"apiVersion": "1.0",
"data": {
"apiList": [
{"id": "api-discovery", "version": "1.0", "name": "API Discovery Service"},
{"id": "param-cgi", "version": "1.0", "name": "Legacy Parameter Handling"},
]
},
}
DEFAULT_BRAND = """root.Brand.Brand=AXIS
root.Brand.ProdFullName=AXIS M1065-LW Network Camera
root.Brand.ProdNbr=M1065-LW
@ -76,6 +93,7 @@ async def setup_axis_integration(
hass,
config=ENTRY_CONFIG,
options=ENTRY_OPTIONS,
api_discovery=DEFAULT_API_DISCOVERY,
brand=DEFAULT_BRAND,
ports=DEFAULT_PORTS,
properties=DEFAULT_PROPERTIES,
@ -91,6 +109,9 @@ async def setup_axis_integration(
)
config_entry.add_to_hass(hass)
def mock_update_api_discovery(self):
self.process_raw(api_discovery)
def mock_update_brand(self):
self.process_raw(brand)
@ -100,15 +121,17 @@ async def setup_axis_integration(
def mock_update_properties(self):
self.process_raw(properties)
with patch("axis.param_cgi.Brand.update_brand", new=mock_update_brand), patch(
with patch(
"axis.api_discovery.ApiDiscovery.update", new=mock_update_api_discovery
), patch("axis.param_cgi.Brand.update_brand", new=mock_update_brand), patch(
"axis.param_cgi.Ports.update_ports", new=mock_update_ports
), patch(
"axis.param_cgi.Properties.update_properties", new=mock_update_properties
), patch(
"axis.AxisDevice.start", return_value=True
"axis.rtsp.RTSPClient.start", return_value=True,
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
await hass.async_block_till_done()
return hass.data[AXIS_DOMAIN].get(config_entry.unique_id)
@ -134,6 +157,36 @@ async def test_device_setup(hass):
assert device.serial == ENTRY_CONFIG[CONF_MAC]
async def test_device_support_mqtt(hass):
"""Successful setup."""
api_discovery = deepcopy(DEFAULT_API_DISCOVERY)
api_discovery["data"]["apiList"].append(
{"id": "mqtt-client", "version": "1.0", "name": "MQTT Client API"}
)
get_client_status = {"data": {"status": {"state": "active"}}}
mock_mqtt = await async_mock_mqtt_component(hass)
with patch(
"axis.mqtt.MqttClient.get_client_status", return_value=get_client_status
):
await setup_axis_integration(hass, api_discovery=api_discovery)
mock_mqtt.async_subscribe.assert_called_with(f"{MAC}/#", mock.ANY, 0, "utf-8")
topic = f"{MAC}/event/tns:onvif/Device/tns:axis/Sensor/PIR/$source/sensor/0"
message = b'{"timestamp": 1590258472044, "topic": "onvif:Device/axis:Sensor/PIR", "message": {"source": {"sensor": "0"}, "key": {}, "data": {"state": "1"}}}'
assert len(hass.states.async_entity_ids(BINARY_SENSOR_DOMAIN)) == 0
async_fire_mqtt_message(hass, topic, message)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(BINARY_SENSOR_DOMAIN)) == 1
pir = hass.states.get(f"binary_sensor.{NAME}_pir_0")
assert pir.state == "on"
assert pir.name == f"{NAME} PIR 0"
async def test_update_address(hass):
"""Test update address works."""
device = await setup_axis_integration(hass)
@ -208,13 +261,13 @@ async def test_shutdown():
axis_device.shutdown(None)
assert len(axis_device.api.stop.mock_calls) == 1
assert len(axis_device.api.stream.stop.mock_calls) == 1
async def test_get_device_fails(hass):
"""Device unauthorized yields authentication required error."""
with patch(
"axis.param_cgi.Params.update_brand", side_effect=axislib.Unauthorized
"axis.api_discovery.ApiDiscovery.update", side_effect=axislib.Unauthorized
), pytest.raises(axis.errors.AuthenticationRequired):
await axis.device.get_device(hass, host="", port="", username="", password="")
@ -222,7 +275,7 @@ async def test_get_device_fails(hass):
async def test_get_device_device_unavailable(hass):
"""Device unavailable yields cannot connect error."""
with patch(
"axis.param_cgi.Params.update_brand", side_effect=axislib.RequestError
"axis.api_discovery.ApiDiscovery.update", side_effect=axislib.RequestError
), pytest.raises(axis.errors.CannotConnect):
await axis.device.get_device(hass, host="", port="", username="", password="")
@ -230,6 +283,6 @@ async def test_get_device_device_unavailable(hass):
async def test_get_device_unknown_error(hass):
"""Device yield unknown error."""
with patch(
"axis.param_cgi.Params.update_brand", side_effect=axislib.AxisException
"axis.api_discovery.ApiDiscovery.update", side_effect=axislib.AxisException
), pytest.raises(axis.errors.AuthenticationRequired):
await axis.device.get_device(hass, host="", port="", username="", password="")