From 4e32b6569450128252f89eaa011ca58203a6896a Mon Sep 17 00:00:00 2001 From: Jorgen Evens Date: Tue, 21 Feb 2023 16:48:53 +0100 Subject: [PATCH] Improve pjlink reliability (#80745) Co-authored-by: Martin Hjelmare --- .coveragerc | 1 - .../components/pjlink/media_player.py | 87 +++-- requirements_test_all.txt | 3 + tests/components/pjlink/__init__.py | 1 + tests/components/pjlink/test_media_player.py | 365 ++++++++++++++++++ 5 files changed, 427 insertions(+), 30 deletions(-) create mode 100644 tests/components/pjlink/__init__.py create mode 100644 tests/components/pjlink/test_media_player.py diff --git a/.coveragerc b/.coveragerc index 52dfe089522..871f798a653 100644 --- a/.coveragerc +++ b/.coveragerc @@ -898,7 +898,6 @@ omit = homeassistant/components/ping/binary_sensor.py homeassistant/components/ping/device_tracker.py homeassistant/components/pioneer/media_player.py - homeassistant/components/pjlink/media_player.py homeassistant/components/plaato/__init__.py homeassistant/components/plaato/binary_sensor.py homeassistant/components/plaato/entity.py diff --git a/homeassistant/components/pjlink/media_player.py b/homeassistant/components/pjlink/media_player.py index e7c4eeea1ee..3fb5facac5e 100644 --- a/homeassistant/components/pjlink/media_player.py +++ b/homeassistant/components/pjlink/media_player.py @@ -1,6 +1,8 @@ """Support for controlling projector via the PJLink protocol.""" from __future__ import annotations +import socket + from pypjlink import MUTE_AUDIO, Projector from pypjlink.projector import ProjectorError import voluptuous as vol @@ -23,6 +25,8 @@ DEFAULT_PORT = 4352 DEFAULT_ENCODING = "utf-8" DEFAULT_TIMEOUT = 10 +ERR_PROJECTOR_UNAVAILABLE = "projector unavailable" + PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_HOST): cv.string, @@ -79,55 +83,80 @@ class PjLinkDevice(MediaPlayerEntity): """Iinitialize the PJLink device.""" self._host = host self._port = port - self._attr_name = name self._password = password self._encoding = encoding + self._source_name_mapping = {} + + self._attr_name = name self._attr_is_volume_muted = False self._attr_state = MediaPlayerState.OFF - with self.projector() as projector: - if not self._attr_name: - self._attr_name = projector.get_name() - inputs = projector.get_inputs() + self._attr_source = None + self._attr_source_list = [] + self._attr_available = False + + def _force_off(self): + self._attr_state = MediaPlayerState.OFF + self._attr_is_volume_muted = False + self._attr_source = None + + def _setup_projector(self): + try: + with self.projector() as projector: + if not self._attr_name: + self._attr_name = projector.get_name() + inputs = projector.get_inputs() + except ProjectorError as err: + if str(err) == ERR_PROJECTOR_UNAVAILABLE: + return False + raise + self._source_name_mapping = {format_input_source(*x): x for x in inputs} - self._attr_source_list = sorted(self._source_name_mapping.keys()) + self._attr_source_list = sorted(self._source_name_mapping) + return True def projector(self): """Create PJLink Projector instance.""" - projector = Projector.from_address( - self._host, self._port, self._encoding, DEFAULT_TIMEOUT - ) - projector.authenticate(self._password) + try: + projector = Projector.from_address(self._host, self._port) + projector.authenticate(self._password) + except (socket.timeout, OSError) as err: + self._attr_available = False + raise ProjectorError(ERR_PROJECTOR_UNAVAILABLE) from err + return projector def update(self) -> None: """Get the latest state from the device.""" - with self.projector() as projector: - try: + if not self._attr_available: + self._attr_available = self._setup_projector() + + if not self._attr_available: + self._force_off() + return + + try: + with self.projector() as projector: pwstate = projector.get_power() if pwstate in ("on", "warm-up"): self._attr_state = MediaPlayerState.ON self._attr_is_volume_muted = projector.get_mute()[1] self._attr_source = format_input_source(*projector.get_input()) else: - self._attr_state = MediaPlayerState.OFF - self._attr_is_volume_muted = False - self._attr_source = None - except KeyError as err: - if str(err) == "'OK'": - self._attr_state = MediaPlayerState.OFF - self._attr_is_volume_muted = False - self._attr_source = None - else: - raise - except ProjectorError as err: - if str(err) == "unavailable time": - self._attr_state = MediaPlayerState.OFF - self._attr_is_volume_muted = False - self._attr_source = None - else: - raise + self._force_off() + except KeyError as err: + if str(err) == "'OK'": + self._force_off() + else: + raise + except ProjectorError as err: + if str(err) == "unavailable time": + self._force_off() + elif str(err) == ERR_PROJECTOR_UNAVAILABLE: + self._attr_available = False + else: + raise def turn_off(self) -> None: """Turn projector off.""" diff --git a/requirements_test_all.txt b/requirements_test_all.txt index aa5ebe9ca3e..3abfe3267df 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -1351,6 +1351,9 @@ pyownet==0.10.0.post1 # homeassistant.components.lcn pypck==0.7.16 +# homeassistant.components.pjlink +pypjlink2==1.2.1 + # homeassistant.components.plaato pyplaato==0.0.18 diff --git a/tests/components/pjlink/__init__.py b/tests/components/pjlink/__init__.py new file mode 100644 index 00000000000..4a52b3c434d --- /dev/null +++ b/tests/components/pjlink/__init__.py @@ -0,0 +1 @@ +"""Test the pjlink integration.""" diff --git a/tests/components/pjlink/test_media_player.py b/tests/components/pjlink/test_media_player.py new file mode 100644 index 00000000000..c4a923c16ee --- /dev/null +++ b/tests/components/pjlink/test_media_player.py @@ -0,0 +1,365 @@ +"""Test the pjlink media player platform.""" + +from datetime import timedelta +import socket +from unittest.mock import create_autospec, patch + +import pypjlink +from pypjlink import MUTE_AUDIO +from pypjlink.projector import ProjectorError +import pytest + +import homeassistant.components.media_player as media_player +from homeassistant.const import ATTR_ENTITY_ID +from homeassistant.setup import async_setup_component +from homeassistant.util import dt + +from tests.common import assert_setup_component, async_fire_time_changed + + +@pytest.fixture(name="projector_from_address") +def projector_from_address(): + """Create pjlink Projector mock.""" + + with patch("pypjlink.Projector.from_address") as from_address: + constructor = create_autospec(pypjlink.Projector) + from_address.return_value = constructor.return_value + yield from_address + + +@pytest.fixture(name="mocked_projector") +def mocked_projector(projector_from_address): + """Create pjlink Projector instance mock.""" + + instance = projector_from_address.return_value + + with instance as mocked_instance: + mocked_instance.get_name.return_value = "Test" + mocked_instance.get_power.return_value = "on" + mocked_instance.get_mute.return_value = [0, True] + mocked_instance.get_input.return_value = [0, 1] + mocked_instance.get_inputs.return_value = ( + ("HDMI", 1), + ("HDMI", 2), + ("VGA", 1), + ) + + yield mocked_instance + + +@pytest.mark.parametrize("side_effect", [socket.timeout, OSError]) +async def test_offline_initialization(projector_from_address, hass, side_effect): + """Test initialization of a device that is offline.""" + + with assert_setup_component(1, media_player.DOMAIN): + projector_from_address.side_effect = side_effect + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "name": "test_offline", + "host": "127.0.0.1", + } + }, + ) + await hass.async_block_till_done() + + state = hass.states.get("media_player.test_offline") + assert state.state == "unavailable" + + +async def test_initialization(projector_from_address, hass): + """Test a device that is available.""" + + with assert_setup_component(1, media_player.DOMAIN): + instance = projector_from_address.return_value + + with instance as mocked_instance: + mocked_instance.get_name.return_value = "Test" + mocked_instance.get_inputs.return_value = ( + ("HDMI", 1), + ("HDMI", 2), + ("VGA", 1), + ) + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "off" + + assert "source_list" in state.attributes + source_list = state.attributes["source_list"] + + assert set(source_list) == {"HDMI 1", "HDMI 2", "VGA 1"} + + +@pytest.mark.parametrize("power_state", ["on", "warm-up"]) +async def test_on_state_init(projector_from_address, hass, power_state): + """Test a device that is available.""" + + with assert_setup_component(1, media_player.DOMAIN): + instance = projector_from_address.return_value + + with instance as mocked_instance: + mocked_instance.get_name.return_value = "Test" + mocked_instance.get_power.return_value = power_state + mocked_instance.get_inputs.return_value = (("HDMI", 1),) + mocked_instance.get_input.return_value = ("HDMI", 1) + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "on" + + assert state.attributes["source"] == "HDMI 1" + + +async def test_api_error(projector_from_address, hass): + """Test invalid api responses.""" + + with assert_setup_component(1, media_player.DOMAIN): + instance = projector_from_address.return_value + + with instance as mocked_instance: + mocked_instance.get_name.return_value = "Test" + mocked_instance.get_inputs.return_value = ( + ("HDMI", 1), + ("HDMI", 2), + ("VGA", 1), + ) + mocked_instance.get_power.side_effect = KeyError("OK") + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "off" + + +async def test_update_unavailable(projector_from_address, hass): + """Test update to a device that is unavailable.""" + + with assert_setup_component(1, media_player.DOMAIN): + instance = projector_from_address.return_value + + with instance as mocked_instance: + mocked_instance.get_name.return_value = "Test" + mocked_instance.get_inputs.return_value = ( + ("HDMI", 1), + ("HDMI", 2), + ("VGA", 1), + ) + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "off" + + projector_from_address.side_effect = socket.timeout + async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=10)) + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "unavailable" + + +async def test_unavailable_time(mocked_projector, hass): + """Test unavailable time projector error.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "on" + assert state.attributes["source"] is not None + assert state.attributes["is_volume_muted"] is not False + + mocked_projector.get_power.side_effect = ProjectorError("unavailable time") + async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=10)) + await hass.async_block_till_done() + + state = hass.states.get("media_player.test") + assert state.state == "off" + assert "source" not in state.attributes + assert "is_volume_muted" not in state.attributes + + +async def test_turn_off(mocked_projector, hass): + """Test turning off beamer.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + await hass.services.async_call( + domain=media_player.DOMAIN, + service="turn_off", + service_data={ATTR_ENTITY_ID: "media_player.test"}, + blocking=True, + ) + + mocked_projector.set_power.assert_called_with("off") + + +async def test_turn_on(mocked_projector, hass): + """Test turning on beamer.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + await hass.services.async_call( + domain=media_player.DOMAIN, + service="turn_on", + service_data={ATTR_ENTITY_ID: "media_player.test"}, + blocking=True, + ) + + mocked_projector.set_power.assert_called_with("on") + + +async def test_mute(mocked_projector, hass): + """Test muting beamer.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + await hass.services.async_call( + domain=media_player.DOMAIN, + service="volume_mute", + service_data={ATTR_ENTITY_ID: "media_player.test", "is_volume_muted": True}, + blocking=True, + ) + + mocked_projector.set_mute.assert_called_with(MUTE_AUDIO, True) + + +async def test_unmute(mocked_projector, hass): + """Test unmuting beamer.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + await hass.services.async_call( + domain=media_player.DOMAIN, + service="volume_mute", + service_data={ATTR_ENTITY_ID: "media_player.test", "is_volume_muted": False}, + blocking=True, + ) + + mocked_projector.set_mute.assert_called_with(MUTE_AUDIO, False) + + +async def test_select_source(mocked_projector, hass): + """Test selecting source.""" + + assert await async_setup_component( + hass, + media_player.DOMAIN, + { + media_player.DOMAIN: { + "platform": "pjlink", + "host": "127.0.0.1", + } + }, + ) + + await hass.async_block_till_done() + await hass.services.async_call( + domain=media_player.DOMAIN, + service="select_source", + service_data={ATTR_ENTITY_ID: "media_player.test", "source": "VGA 1"}, + blocking=True, + ) + + mocked_projector.set_input.assert_called_with("VGA", 1)