From f8c38fad0024f38ecd99524427fb3d6054b53708 Mon Sep 17 00:00:00 2001 From: Robert Svensson Date: Wed, 24 Apr 2024 20:47:22 +0200 Subject: [PATCH] Split out event handling from Axis hub (#113837) * Split out event handling from Axis hub * Improve test coverage * Mark internal methods with '_' * Rename to event source --- .../components/axis/hub/event_source.py | 93 +++++++++++++++++++ homeassistant/components/axis/hub/hub.py | 77 +++------------ tests/components/axis/conftest.py | 9 +- tests/components/axis/test_hub.py | 15 ++- 4 files changed, 126 insertions(+), 68 deletions(-) create mode 100644 homeassistant/components/axis/hub/event_source.py diff --git a/homeassistant/components/axis/hub/event_source.py b/homeassistant/components/axis/hub/event_source.py new file mode 100644 index 00000000000..7f2bfe7c982 --- /dev/null +++ b/homeassistant/components/axis/hub/event_source.py @@ -0,0 +1,93 @@ +"""Axis network device abstraction.""" + +from __future__ import annotations + +import axis +from axis.errors import Unauthorized +from axis.interfaces.mqtt import mqtt_json_to_event +from axis.models.mqtt import ClientState +from axis.stream_manager import Signal, State + +from homeassistant.components import mqtt +from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN +from homeassistant.components.mqtt.models import ReceiveMessage +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.dispatcher import async_dispatcher_send +from homeassistant.setup import async_when_setup + + +class AxisEventSource: + """Manage connection to event sources from an Axis device.""" + + def __init__( + self, hass: HomeAssistant, config_entry: ConfigEntry, api: axis.AxisDevice + ) -> None: + """Initialize the device.""" + self.hass = hass + self.config_entry = config_entry + self.api = api + + self.signal_reachable = f"axis_reachable_{config_entry.entry_id}" + + self.available = True + + @callback + def setup(self) -> None: + """Set up the device events.""" + self.api.stream.connection_status_callback.append(self._connection_status_cb) + self.api.enable_events() + self.api.stream.start() + + if self.api.vapix.mqtt.supported: + async_when_setup(self.hass, MQTT_DOMAIN, self._async_use_mqtt) + + @callback + def teardown(self) -> None: + """Tear down connections.""" + self._disconnect_from_stream() + + @callback + def _disconnect_from_stream(self) -> None: + """Stop stream.""" + if self.api.stream.state != State.STOPPED: + self.api.stream.connection_status_callback.clear() + self.api.stream.stop() + + async def _async_use_mqtt(self, hass: HomeAssistant, component: str) -> None: + """Set up to use MQTT.""" + try: + status = await self.api.vapix.mqtt.get_client_status() + except Unauthorized: + # This means the user has too low privileges + return + + if status.status.state == ClientState.ACTIVE: + self.config_entry.async_on_unload( + await mqtt.async_subscribe( + hass, f"{status.config.device_topic_prefix}/#", self._mqtt_message + ) + ) + + @callback + def _mqtt_message(self, message: ReceiveMessage) -> None: + """Receive Axis MQTT message.""" + self._disconnect_from_stream() + + if message.topic.endswith("event/connection"): + return + + event = mqtt_json_to_event(message.payload) + self.api.event.handler(event) + + @callback + def _connection_status_cb(self, status: Signal) -> None: + """Handle signals of device connection status. + + This is called on every RTSP keep-alive message. + Only signal state change if state change is true. + """ + + if self.available != (status == Signal.PLAYING): + self.available = not self.available + async_dispatcher_send(self.hass, self.signal_reachable) diff --git a/homeassistant/components/axis/hub/hub.py b/homeassistant/components/axis/hub/hub.py index 4abd1358417..4e58e3be7c6 100644 --- a/homeassistant/components/axis/hub/hub.py +++ b/homeassistant/components/axis/hub/hub.py @@ -5,24 +5,17 @@ from __future__ import annotations from typing import Any import axis -from axis.errors import Unauthorized -from axis.interfaces.mqtt import mqtt_json_to_event -from axis.models.mqtt import ClientState -from axis.stream_manager import Signal, State -from homeassistant.components import mqtt -from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN -from homeassistant.components.mqtt.models import ReceiveMessage from homeassistant.config_entries import ConfigEntry from homeassistant.core import Event, HomeAssistant, callback from homeassistant.helpers import device_registry as dr from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, format_mac from homeassistant.helpers.dispatcher import async_dispatcher_send -from homeassistant.setup import async_when_setup from ..const import ATTR_MANUFACTURER, DOMAIN as AXIS_DOMAIN from .config import AxisConfig from .entity_loader import AxisEntityLoader +from .event_source import AxisEventSource class AxisHub: @@ -35,9 +28,9 @@ class AxisHub: self.hass = hass self.config = AxisConfig.from_config_entry(config_entry) self.entity_loader = AxisEntityLoader(self) + self.event_source = AxisEventSource(hass, config_entry, api) self.api = api - self.available = True self.fw_version = api.vapix.firmware_version self.product_type = api.vapix.product_type self.unique_id = format_mac(api.vapix.serial_number) @@ -51,32 +44,23 @@ class AxisHub: hub: AxisHub = hass.data[AXIS_DOMAIN][config_entry.entry_id] return hub + @property + def available(self) -> bool: + """Connection state to the device.""" + return self.event_source.available + # Signals @property def signal_reachable(self) -> str: """Device specific event to signal a change in connection status.""" - return f"axis_reachable_{self.config.entry.entry_id}" + return self.event_source.signal_reachable @property def signal_new_address(self) -> str: """Device specific event to signal a change in device address.""" return f"axis_new_address_{self.config.entry.entry_id}" - # Callbacks - - @callback - def connection_status_callback(self, status: Signal) -> None: - """Handle signals of device connection status. - - This is called on every RTSP keep-alive message. - Only signal state change if state change is true. - """ - - if self.available != (status == Signal.PLAYING): - self.available = not self.available - async_dispatcher_send(self.hass, self.signal_reachable) - @staticmethod async def async_new_address_callback( hass: HomeAssistant, config_entry: ConfigEntry @@ -89,6 +73,7 @@ class AxisHub: """ hub = AxisHub.get_hub(hass, config_entry) hub.config = AxisConfig.from_config_entry(config_entry) + hub.event_source.config_entry = config_entry hub.api.config.host = hub.config.host async_dispatcher_send(hass, hub.signal_new_address) @@ -106,57 +91,19 @@ class AxisHub: sw_version=self.fw_version, ) - async def async_use_mqtt(self, hass: HomeAssistant, component: str) -> None: - """Set up to use MQTT.""" - try: - status = await self.api.vapix.mqtt.get_client_status() - except Unauthorized: - # This means the user has too low privileges - return - if status.status.state == ClientState.ACTIVE: - self.config.entry.async_on_unload( - await mqtt.async_subscribe( - hass, f"{status.config.device_topic_prefix}/#", self.mqtt_message - ) - ) - - @callback - def mqtt_message(self, message: ReceiveMessage) -> None: - """Receive Axis MQTT message.""" - self.disconnect_from_stream() - if message.topic.endswith("event/connection"): - return - event = mqtt_json_to_event(message.payload) - self.api.event.handler(event) - # Setup and teardown methods @callback def setup(self) -> None: """Set up the device events.""" self.entity_loader.initialize_platforms() - - self.api.stream.connection_status_callback.append( - self.connection_status_callback - ) - self.api.enable_events() - self.api.stream.start() - - if self.api.vapix.mqtt.supported: - async_when_setup(self.hass, MQTT_DOMAIN, self.async_use_mqtt) - - @callback - def disconnect_from_stream(self) -> None: - """Stop stream.""" - if self.api.stream.state != State.STOPPED: - self.api.stream.connection_status_callback.clear() - self.api.stream.stop() + self.event_source.setup() async def shutdown(self, event: Event) -> None: """Stop the event stream.""" - self.disconnect_from_stream() + self.event_source.teardown() @callback def teardown(self) -> None: """Reset this device to default state.""" - self.disconnect_from_stream() + self.event_source.teardown() diff --git a/tests/components/axis/conftest.py b/tests/components/axis/conftest.py index b50a28df49f..7a4e446a0cc 100644 --- a/tests/components/axis/conftest.py +++ b/tests/components/axis/conftest.py @@ -114,6 +114,7 @@ def default_request_fixture( port_management_payload: dict[str, Any], param_properties_payload: dict[str, Any], param_ports_payload: dict[str, Any], + mqtt_status_code: int, ) -> Callable[[str], None]: """Mock default Vapix requests responses.""" @@ -131,7 +132,7 @@ def default_request_fixture( json=port_management_payload, ) respx.post("/axis-cgi/mqtt/client.cgi").respond( - json=MQTT_CLIENT_RESPONSE, + json=MQTT_CLIENT_RESPONSE, status_code=mqtt_status_code ) respx.post("/axis-cgi/streamprofile.cgi").respond( json=STREAM_PROFILES_RESPONSE, @@ -239,6 +240,12 @@ def param_ports_data_fixture() -> dict[str, Any]: return PORTS_RESPONSE +@pytest.fixture(name="mqtt_status_code") +def mqtt_status_code_fixture(): + """Property parameter data.""" + return 200 + + @pytest.fixture(name="setup_default_vapix_requests") def default_vapix_requests_fixture(mock_vapix_requests: Callable[[str], None]) -> None: """Mock default Vapix requests responses.""" diff --git a/tests/components/axis/test_hub.py b/tests/components/axis/test_hub.py index 1ae6db05427..5948874f0bf 100644 --- a/tests/components/axis/test_hub.py +++ b/tests/components/axis/test_hub.py @@ -2,7 +2,7 @@ from ipaddress import ip_address from unittest import mock -from unittest.mock import Mock, patch +from unittest.mock import Mock, call, patch import axis as axislib import pytest @@ -91,7 +91,8 @@ async def test_device_support_mqtt( hass: HomeAssistant, mqtt_mock: MqttMockHAClient, setup_config_entry ) -> None: """Successful setup.""" - mqtt_mock.async_subscribe.assert_called_with(f"axis/{MAC}/#", mock.ANY, 0, "utf-8") + mqtt_call = call(f"axis/{MAC}/#", mock.ANY, 0, "utf-8") + assert mqtt_call in mqtt_mock.async_subscribe.call_args_list topic = f"axis/{MAC}/event/tns:onvif/Device/tns:axis/Sensor/PIR/$source/sensor/0" message = ( @@ -109,6 +110,16 @@ async def test_device_support_mqtt( assert pir.name == f"{NAME} PIR 0" +@pytest.mark.parametrize("api_discovery_items", [API_DISCOVERY_MQTT]) +@pytest.mark.parametrize("mqtt_status_code", [401]) +async def test_device_support_mqtt_low_privilege( + hass: HomeAssistant, mqtt_mock: MqttMockHAClient, setup_config_entry +) -> None: + """Successful setup.""" + mqtt_call = call(f"{MAC}/#", mock.ANY, 0, "utf-8") + assert mqtt_call not in mqtt_mock.async_subscribe.call_args_list + + async def test_update_address( hass: HomeAssistant, setup_config_entry, mock_vapix_requests ) -> None: