From 40252763d702aa710a3249f95947a1572374bdf8 Mon Sep 17 00:00:00 2001 From: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com> Date: Mon, 21 Jul 2025 16:31:28 +0200 Subject: [PATCH] Switch to a new library in Onkyo (#148613) --- homeassistant/components/onkyo/__init__.py | 26 +- homeassistant/components/onkyo/config_flow.py | 49 +- homeassistant/components/onkyo/const.py | 234 +------- homeassistant/components/onkyo/manifest.json | 5 +- .../components/onkyo/media_player.py | 510 +++++++----------- .../components/onkyo/quality_scale.yaml | 5 +- homeassistant/components/onkyo/receiver.py | 202 +++---- homeassistant/components/onkyo/services.py | 18 +- homeassistant/components/onkyo/util.py | 8 + requirements_all.txt | 6 +- requirements_test_all.txt | 6 +- tests/components/onkyo/__init__.py | 125 ++--- tests/components/onkyo/conftest.py | 227 +++++--- .../onkyo/snapshots/test_media_player.ambr | 203 +++++++ tests/components/onkyo/test_config_flow.py | 321 +++++------ tests/components/onkyo/test_init.py | 84 ++- tests/components/onkyo/test_media_player.py | 230 ++++++++ 17 files changed, 1255 insertions(+), 1004 deletions(-) create mode 100644 homeassistant/components/onkyo/util.py create mode 100644 tests/components/onkyo/snapshots/test_media_player.ambr create mode 100644 tests/components/onkyo/test_media_player.py diff --git a/homeassistant/components/onkyo/__init__.py b/homeassistant/components/onkyo/__init__.py index d0f93012eb7..a4d1ec8f175 100644 --- a/homeassistant/components/onkyo/__init__.py +++ b/homeassistant/components/onkyo/__init__.py @@ -17,7 +17,7 @@ from .const import ( InputSource, ListeningMode, ) -from .receiver import Receiver, async_interview +from .receiver import ReceiverManager, async_interview from .services import DATA_MP_ENTITIES, async_setup_services _LOGGER = logging.getLogger(__name__) @@ -31,7 +31,7 @@ CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN) class OnkyoData: """Config Entry data.""" - receiver: Receiver + manager: ReceiverManager sources: dict[InputSource, str] sound_modes: dict[ListeningMode, str] @@ -50,11 +50,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: OnkyoConfigEntry) -> boo host = entry.data[CONF_HOST] - info = await async_interview(host) + try: + info = await async_interview(host) + except OSError as exc: + raise ConfigEntryNotReady(f"Unable to connect to: {host}") from exc if info is None: raise ConfigEntryNotReady(f"Unable to connect to: {host}") - receiver = await Receiver.async_create(info) + manager = ReceiverManager(hass, entry, info) sources_store: dict[str, str] = entry.options[OPTION_INPUT_SOURCES] sources = {InputSource(k): v for k, v in sources_store.items()} @@ -62,11 +65,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: OnkyoConfigEntry) -> boo sound_modes_store: dict[str, str] = entry.options.get(OPTION_LISTENING_MODES, {}) sound_modes = {ListeningMode(k): v for k, v in sound_modes_store.items()} - entry.runtime_data = OnkyoData(receiver, sources, sound_modes) + entry.runtime_data = OnkyoData(manager, sources, sound_modes) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) - await receiver.conn.connect() + if error := await manager.start(): + try: + await error + except OSError as exc: + raise ConfigEntryNotReady(f"Unable to connect to: {host}") from exc return True @@ -75,9 +82,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: OnkyoConfigEntry) -> bo """Unload Onkyo config entry.""" del hass.data[DATA_MP_ENTITIES][entry.entry_id] - unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + entry.runtime_data.manager.start_unloading() - receiver = entry.runtime_data.receiver - receiver.conn.close() - - return unload_ok + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) diff --git a/homeassistant/components/onkyo/config_flow.py b/homeassistant/components/onkyo/config_flow.py index 2b8f9981e4a..75b0f92043d 100644 --- a/homeassistant/components/onkyo/config_flow.py +++ b/homeassistant/components/onkyo/config_flow.py @@ -4,12 +4,12 @@ from collections.abc import Mapping import logging from typing import Any +from aioonkyo import ReceiverInfo import voluptuous as vol from yarl import URL from homeassistant.config_entries import ( SOURCE_RECONFIGURE, - ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlowWithReload, @@ -29,6 +29,7 @@ from homeassistant.helpers.selector import ( ) from homeassistant.helpers.service_info.ssdp import SsdpServiceInfo +from . import OnkyoConfigEntry from .const import ( DOMAIN, OPTION_INPUT_SOURCES, @@ -41,19 +42,20 @@ from .const import ( InputSource, ListeningMode, ) -from .receiver import ReceiverInfo, async_discover, async_interview +from .receiver import async_discover, async_interview +from .util import get_meaning _LOGGER = logging.getLogger(__name__) CONF_DEVICE = "device" -INPUT_SOURCES_DEFAULT: dict[str, str] = {} -LISTENING_MODES_DEFAULT: dict[str, str] = {} +INPUT_SOURCES_DEFAULT: list[InputSource] = [] +LISTENING_MODES_DEFAULT: list[ListeningMode] = [] INPUT_SOURCES_ALL_MEANINGS = { - input_source.value_meaning: input_source for input_source in InputSource + get_meaning(input_source): input_source for input_source in InputSource } LISTENING_MODES_ALL_MEANINGS = { - listening_mode.value_meaning: listening_mode for listening_mode in ListeningMode + get_meaning(listening_mode): listening_mode for listening_mode in ListeningMode } STEP_MANUAL_SCHEMA = vol.Schema({vol.Required(CONF_HOST): str}) STEP_RECONFIGURE_SCHEMA = vol.Schema( @@ -91,6 +93,7 @@ class OnkyoConfigFlow(ConfigFlow, domain=DOMAIN): self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle a flow initialized by the user.""" + _LOGGER.debug("Config flow start user") return self.async_show_menu( step_id="user", menu_options=["manual", "eiscp_discovery"] ) @@ -103,10 +106,10 @@ class OnkyoConfigFlow(ConfigFlow, domain=DOMAIN): if user_input is not None: host = user_input[CONF_HOST] - _LOGGER.debug("Config flow start manual: %s", host) + _LOGGER.debug("Config flow manual: %s", host) try: info = await async_interview(host) - except Exception: + except OSError: _LOGGER.exception("Unexpected exception") errors["base"] = "unknown" else: @@ -156,8 +159,8 @@ class OnkyoConfigFlow(ConfigFlow, domain=DOMAIN): _LOGGER.debug("Config flow start eiscp discovery") try: - infos = await async_discover() - except Exception: + infos = list(await async_discover(self.hass)) + except OSError: _LOGGER.exception("Unexpected exception") return self.async_abort(reason="unknown") @@ -303,8 +306,14 @@ class OnkyoConfigFlow(ConfigFlow, domain=DOMAIN): if reconfigure_entry is None: suggested_values = { OPTION_VOLUME_RESOLUTION: OPTION_VOLUME_RESOLUTION_DEFAULT, - OPTION_INPUT_SOURCES: INPUT_SOURCES_DEFAULT, - OPTION_LISTENING_MODES: LISTENING_MODES_DEFAULT, + OPTION_INPUT_SOURCES: [ + get_meaning(input_source) + for input_source in INPUT_SOURCES_DEFAULT + ], + OPTION_LISTENING_MODES: [ + get_meaning(listening_mode) + for listening_mode in LISTENING_MODES_DEFAULT + ], } else: entry_options = reconfigure_entry.options @@ -325,11 +334,12 @@ class OnkyoConfigFlow(ConfigFlow, domain=DOMAIN): self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle reconfiguration of the receiver.""" + _LOGGER.debug("Config flow start reconfigure") return await self.async_step_manual() @staticmethod @callback - def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlowWithReload: + def async_get_options_flow(config_entry: OnkyoConfigEntry) -> OptionsFlowWithReload: """Return the options flow.""" return OnkyoOptionsFlowHandler() @@ -372,7 +382,10 @@ class OnkyoOptionsFlowHandler(OptionsFlowWithReload): entry_options: Mapping[str, Any] = self.config_entry.options entry_options = { - OPTION_LISTENING_MODES: LISTENING_MODES_DEFAULT, + OPTION_LISTENING_MODES: { + listening_mode.value: get_meaning(listening_mode) + for listening_mode in LISTENING_MODES_DEFAULT + }, **entry_options, } @@ -416,11 +429,11 @@ class OnkyoOptionsFlowHandler(OptionsFlowWithReload): suggested_values = { OPTION_MAX_VOLUME: entry_options[OPTION_MAX_VOLUME], OPTION_INPUT_SOURCES: [ - InputSource(input_source).value_meaning + get_meaning(InputSource(input_source)) for input_source in entry_options[OPTION_INPUT_SOURCES] ], OPTION_LISTENING_MODES: [ - ListeningMode(listening_mode).value_meaning + get_meaning(ListeningMode(listening_mode)) for listening_mode in entry_options[OPTION_LISTENING_MODES] ], } @@ -463,13 +476,13 @@ class OnkyoOptionsFlowHandler(OptionsFlowWithReload): input_sources_schema_dict: dict[Any, Selector] = {} for input_source, input_source_name in self._input_sources.items(): input_sources_schema_dict[ - vol.Required(input_source.value_meaning, default=input_source_name) + vol.Required(get_meaning(input_source), default=input_source_name) ] = TextSelector() listening_modes_schema_dict: dict[Any, Selector] = {} for listening_mode, listening_mode_name in self._listening_modes.items(): listening_modes_schema_dict[ - vol.Required(listening_mode.value_meaning, default=listening_mode_name) + vol.Required(get_meaning(listening_mode), default=listening_mode_name) ] = TextSelector() return self.async_show_form( diff --git a/homeassistant/components/onkyo/const.py b/homeassistant/components/onkyo/const.py index 851d80c5100..4f5be4238b4 100644 --- a/homeassistant/components/onkyo/const.py +++ b/homeassistant/components/onkyo/const.py @@ -1,10 +1,9 @@ """Constants for the Onkyo integration.""" -from enum import Enum import typing -from typing import Literal, Self +from typing import Literal -import pyeiscp +from aioonkyo import HDMIOutputParam, InputSourceParam, ListeningModeParam, Zone DOMAIN = "onkyo" @@ -21,214 +20,37 @@ VOLUME_RESOLUTION_ALLOWED: tuple[VolumeResolution, ...] = typing.get_args( OPTION_MAX_VOLUME = "max_volume" OPTION_MAX_VOLUME_DEFAULT = 100.0 - -class EnumWithMeaning(Enum): - """Enum with meaning.""" - - value_meaning: str - - def __new__(cls, value: str) -> Self: - """Create enum.""" - obj = object.__new__(cls) - obj._value_ = value - obj.value_meaning = cls._get_meanings()[value] - - return obj - - @staticmethod - def _get_meanings() -> dict[str, str]: - raise NotImplementedError - - OPTION_INPUT_SOURCES = "input_sources" OPTION_LISTENING_MODES = "listening_modes" -_INPUT_SOURCE_MEANINGS = { - "00": "VIDEO1 ··· VCR/DVR ··· STB/DVR", - "01": "VIDEO2 ··· CBL/SAT", - "02": "VIDEO3 ··· GAME/TV ··· GAME", - "03": "VIDEO4 ··· AUX", - "04": "VIDEO5 ··· AUX2 ··· GAME2", - "05": "VIDEO6 ··· PC", - "06": "VIDEO7", - "07": "HIDDEN1 ··· EXTRA1", - "08": "HIDDEN2 ··· EXTRA2", - "09": "HIDDEN3 ··· EXTRA3", - "10": "DVD ··· BD/DVD", - "11": "STRM BOX", - "12": "TV", - "20": "TAPE ··· TV/TAPE", - "21": "TAPE2", - "22": "PHONO", - "23": "CD ··· TV/CD", - "24": "FM", - "25": "AM", - "26": "TUNER", - "27": "MUSIC SERVER ··· P4S ··· DLNA", - "28": "INTERNET RADIO ··· IRADIO FAVORITE", - "29": "USB ··· USB(FRONT)", - "2A": "USB(REAR)", - "2B": "NETWORK ··· NET", - "2D": "AIRPLAY", - "2E": "BLUETOOTH", - "2F": "USB DAC IN", - "30": "MULTI CH", - "31": "XM", - "32": "SIRIUS", - "33": "DAB", - "40": "UNIVERSAL PORT", - "41": "LINE", - "42": "LINE2", - "44": "OPTICAL", - "45": "COAXIAL", - "55": "HDMI 5", - "56": "HDMI 6", - "57": "HDMI 7", - "80": "MAIN SOURCE", +InputSource = InputSourceParam +ListeningMode = ListeningModeParam +HDMIOutput = HDMIOutputParam + +ZONES = { + Zone.MAIN: "Main", + Zone.ZONE2: "Zone 2", + Zone.ZONE3: "Zone 3", + Zone.ZONE4: "Zone 4", } -class InputSource(EnumWithMeaning): - """Receiver input source.""" - - DVR = "00" - CBL = "01" - GAME = "02" - AUX = "03" - GAME2 = "04" - PC = "05" - VIDEO7 = "06" - EXTRA1 = "07" - EXTRA2 = "08" - EXTRA3 = "09" - DVD = "10" - STRM_BOX = "11" - TV = "12" - TAPE = "20" - TAPE2 = "21" - PHONO = "22" - CD = "23" - FM = "24" - AM = "25" - TUNER = "26" - MUSIC_SERVER = "27" - INTERNET_RADIO = "28" - USB = "29" - USB_REAR = "2A" - NETWORK = "2B" - AIRPLAY = "2D" - BLUETOOTH = "2E" - USB_DAC_IN = "2F" - MULTI_CH = "30" - XM = "31" - SIRIUS = "32" - DAB = "33" - UNIVERSAL_PORT = "40" - LINE = "41" - LINE2 = "42" - OPTICAL = "44" - COAXIAL = "45" - HDMI_5 = "55" - HDMI_6 = "56" - HDMI_7 = "57" - MAIN_SOURCE = "80" - - @staticmethod - def _get_meanings() -> dict[str, str]: - return _INPUT_SOURCE_MEANINGS - - -_LISTENING_MODE_MEANINGS = { - "00": "STEREO", - "01": "DIRECT", - "02": "SURROUND", - "03": "FILM ··· GAME RPG ··· ADVANCED GAME", - "04": "THX", - "05": "ACTION ··· GAME ACTION", - "06": "MUSICAL ··· GAME ROCK ··· ROCK/POP", - "07": "MONO MOVIE", - "08": "ORCHESTRA ··· CLASSICAL", - "09": "UNPLUGGED", - "0A": "STUDIO MIX ··· ENTERTAINMENT SHOW", - "0B": "TV LOGIC ··· DRAMA", - "0C": "ALL CH STEREO ··· EXTENDED STEREO", - "0D": "THEATER DIMENSIONAL ··· FRONT STAGE SURROUND", - "0E": "ENHANCED 7/ENHANCE ··· GAME SPORTS ··· SPORTS", - "0F": "MONO", - "11": "PURE AUDIO ··· PURE DIRECT", - "12": "MULTIPLEX", - "13": "FULL MONO ··· MONO MUSIC", - "14": "DOLBY VIRTUAL/SURROUND ENHANCER", - "15": "DTS SURROUND SENSATION", - "16": "AUDYSSEY DSX", - "17": "DTS VIRTUAL:X", - "1F": "WHOLE HOUSE MODE ··· MULTI ZONE MUSIC", - "23": "STAGE (JAPAN GENRE CONTROL)", - "25": "ACTION (JAPAN GENRE CONTROL)", - "26": "MUSIC (JAPAN GENRE CONTROL)", - "2E": "SPORTS (JAPAN GENRE CONTROL)", - "40": "STRAIGHT DECODE ··· 5.1 CH SURROUND", - "41": "DOLBY EX/DTS ES", - "42": "THX CINEMA", - "43": "THX SURROUND EX", - "44": "THX MUSIC", - "45": "THX GAMES", - "50": "THX U(2)/S(2)/I/S CINEMA", - "51": "THX U(2)/S(2)/I/S MUSIC", - "52": "THX U(2)/S(2)/I/S GAMES", - "80": "DOLBY ATMOS/DOLBY SURROUND ··· PLII/PLIIx MOVIE", - "81": "PLII/PLIIx MUSIC", - "82": "DTS:X/NEURAL:X ··· NEO:6/NEO:X CINEMA", - "83": "NEO:6/NEO:X MUSIC", - "84": "DOLBY SURROUND THX CINEMA ··· PLII/PLIIx THX CINEMA", - "85": "DTS NEURAL:X THX CINEMA ··· NEO:6/NEO:X THX CINEMA", - "86": "PLII/PLIIx GAME", - "87": "NEURAL SURR", - "88": "NEURAL THX/NEURAL SURROUND", - "89": "DOLBY SURROUND THX GAMES ··· PLII/PLIIx THX GAMES", - "8A": "DTS NEURAL:X THX GAMES ··· NEO:6/NEO:X THX GAMES", - "8B": "DOLBY SURROUND THX MUSIC ··· PLII/PLIIx THX MUSIC", - "8C": "DTS NEURAL:X THX MUSIC ··· NEO:6/NEO:X THX MUSIC", - "8D": "NEURAL THX CINEMA", - "8E": "NEURAL THX MUSIC", - "8F": "NEURAL THX GAMES", - "90": "PLIIz HEIGHT", - "91": "NEO:6 CINEMA DTS SURROUND SENSATION", - "92": "NEO:6 MUSIC DTS SURROUND SENSATION", - "93": "NEURAL DIGITAL MUSIC", - "94": "PLIIz HEIGHT + THX CINEMA", - "95": "PLIIz HEIGHT + THX MUSIC", - "96": "PLIIz HEIGHT + THX GAMES", - "97": "PLIIz HEIGHT + THX U2/S2 CINEMA", - "98": "PLIIz HEIGHT + THX U2/S2 MUSIC", - "99": "PLIIz HEIGHT + THX U2/S2 GAMES", - "9A": "NEO:X GAME", - "A0": "PLIIx/PLII Movie + AUDYSSEY DSX", - "A1": "PLIIx/PLII MUSIC + AUDYSSEY DSX", - "A2": "PLIIx/PLII GAME + AUDYSSEY DSX", - "A3": "NEO:6 CINEMA + AUDYSSEY DSX", - "A4": "NEO:6 MUSIC + AUDYSSEY DSX", - "A5": "NEURAL SURROUND + AUDYSSEY DSX", - "A6": "NEURAL DIGITAL MUSIC + AUDYSSEY DSX", - "A7": "DOLBY EX + AUDYSSEY DSX", - "FF": "AUTO SURROUND", +LEGACY_HDMI_OUTPUT_MAPPING = { + HDMIOutput.ANALOG: "no,analog", + HDMIOutput.MAIN: "yes,out", + HDMIOutput.SUB: "out-sub,sub,hdbaset", + HDMIOutput.BOTH: "both,sub", + HDMIOutput.BOTH_MAIN: "both", + HDMIOutput.BOTH_SUB: "both", } - -class ListeningMode(EnumWithMeaning): - """Receiver listening mode.""" - - _ignore_ = "ListeningMode _k _v _meaning" - - ListeningMode = vars() - for _k in _LISTENING_MODE_MEANINGS: - ListeningMode["I" + _k] = _k - - @staticmethod - def _get_meanings() -> dict[str, str]: - return _LISTENING_MODE_MEANINGS - - -ZONES = {"main": "Main", "zone2": "Zone 2", "zone3": "Zone 3", "zone4": "Zone 4"} - -PYEISCP_COMMANDS = pyeiscp.commands.COMMANDS +LEGACY_REV_HDMI_OUTPUT_MAPPING = { + "analog": HDMIOutput.ANALOG, + "both": HDMIOutput.BOTH_SUB, + "hdbaset": HDMIOutput.SUB, + "no": HDMIOutput.ANALOG, + "out": HDMIOutput.MAIN, + "out-sub": HDMIOutput.SUB, + "sub": HDMIOutput.BOTH, + "yes": HDMIOutput.MAIN, +} diff --git a/homeassistant/components/onkyo/manifest.json b/homeassistant/components/onkyo/manifest.json index 6f37fb61b44..07834d4cba1 100644 --- a/homeassistant/components/onkyo/manifest.json +++ b/homeassistant/components/onkyo/manifest.json @@ -3,11 +3,12 @@ "name": "Onkyo", "codeowners": ["@arturpragacz", "@eclair4151"], "config_flow": true, + "dependencies": ["network"], "documentation": "https://www.home-assistant.io/integrations/onkyo", "integration_type": "device", "iot_class": "local_push", - "loggers": ["pyeiscp"], - "requirements": ["pyeiscp==0.0.7"], + "loggers": ["aioonkyo"], + "requirements": ["aioonkyo==0.2.0"], "ssdp": [ { "manufacturer": "ONKYO", diff --git a/homeassistant/components/onkyo/media_player.py b/homeassistant/components/onkyo/media_player.py index aed7c51af80..2965388236d 100644 --- a/homeassistant/components/onkyo/media_player.py +++ b/homeassistant/components/onkyo/media_player.py @@ -1,12 +1,12 @@ -"""Support for Onkyo Receivers.""" +"""Media player platform.""" from __future__ import annotations import asyncio -from enum import Enum -from functools import cache import logging -from typing import Any, Literal +from typing import Any + +from aioonkyo import Code, Kind, Status, Zone, command, query, status from homeassistant.components.media_player import ( MediaPlayerEntity, @@ -14,23 +14,25 @@ from homeassistant.components.media_player import ( MediaPlayerState, MediaType, ) -from homeassistant.core import HomeAssistant, callback +from homeassistant.core import HomeAssistant from homeassistant.exceptions import ServiceValidationError from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback from . import OnkyoConfigEntry from .const import ( DOMAIN, + LEGACY_HDMI_OUTPUT_MAPPING, + LEGACY_REV_HDMI_OUTPUT_MAPPING, OPTION_MAX_VOLUME, OPTION_VOLUME_RESOLUTION, - PYEISCP_COMMANDS, ZONES, InputSource, ListeningMode, VolumeResolution, ) -from .receiver import Receiver +from .receiver import ReceiverManager from .services import DATA_MP_ENTITIES +from .util import get_meaning _LOGGER = logging.getLogger(__name__) @@ -86,64 +88,6 @@ VIDEO_INFORMATION_MAPPING = [ "input_hdr", ] -type LibValue = str | tuple[str, ...] - - -def _get_single_lib_value(value: LibValue) -> str: - if isinstance(value, str): - return value - return value[-1] - - -def _get_lib_mapping[T: Enum](cmds: Any, cls: type[T]) -> dict[T, LibValue]: - result: dict[T, LibValue] = {} - for k, v in cmds["values"].items(): - try: - key = cls(k) - except ValueError: - continue - result[key] = v["name"] - - return result - - -@cache -def _input_source_lib_mappings(zone: str) -> dict[InputSource, LibValue]: - match zone: - case "main": - cmds = PYEISCP_COMMANDS["main"]["SLI"] - case "zone2": - cmds = PYEISCP_COMMANDS["zone2"]["SLZ"] - case "zone3": - cmds = PYEISCP_COMMANDS["zone3"]["SL3"] - case "zone4": - cmds = PYEISCP_COMMANDS["zone4"]["SL4"] - - return _get_lib_mapping(cmds, InputSource) - - -@cache -def _rev_input_source_lib_mappings(zone: str) -> dict[LibValue, InputSource]: - return {value: key for key, value in _input_source_lib_mappings(zone).items()} - - -@cache -def _listening_mode_lib_mappings(zone: str) -> dict[ListeningMode, LibValue]: - match zone: - case "main": - cmds = PYEISCP_COMMANDS["main"]["LMD"] - case "zone2": - cmds = PYEISCP_COMMANDS["zone2"]["LMZ"] - case _: - return {} - - return _get_lib_mapping(cmds, ListeningMode) - - -@cache -def _rev_listening_mode_lib_mappings(zone: str) -> dict[LibValue, ListeningMode]: - return {value: key for key, value in _listening_mode_lib_mappings(zone).items()} - async def async_setup_entry( hass: HomeAssistant, @@ -153,10 +97,10 @@ async def async_setup_entry( """Set up MediaPlayer for config entry.""" data = entry.runtime_data - receiver = data.receiver + manager = data.manager all_entities = hass.data[DATA_MP_ENTITIES] - entities: dict[str, OnkyoMediaPlayer] = {} + entities: dict[Zone, OnkyoMediaPlayer] = {} all_entities[entry.entry_id] = entities volume_resolution: VolumeResolution = entry.options[OPTION_VOLUME_RESOLUTION] @@ -164,29 +108,33 @@ async def async_setup_entry( sources = data.sources sound_modes = data.sound_modes - def connect_callback(receiver: Receiver) -> None: - if not receiver.first_connect: + async def connect_callback(reconnect: bool) -> None: + if reconnect: for entity in entities.values(): if entity.enabled: - entity.backfill_state() + await entity.backfill_state() + + async def update_callback(message: Status) -> None: + if isinstance(message, status.Raw): + return + + zone = message.zone - def update_callback(receiver: Receiver, message: tuple[str, str, Any]) -> None: - zone, _, value = message entity = entities.get(zone) if entity is not None: if entity.enabled: entity.process_update(message) - elif zone in ZONES and value != "N/A": - # When we receive the status for a zone, and the value is not "N/A", - # then zone is available on the receiver, so we create the entity for it. + elif not isinstance(message, status.NotAvailable): + # When we receive a valid status for a zone, then that zone is available on the receiver, + # so we create the entity for it. _LOGGER.debug( "Discovered %s on %s (%s)", ZONES[zone], - receiver.model_name, - receiver.host, + manager.info.model_name, + manager.info.host, ) zone_entity = OnkyoMediaPlayer( - receiver, + manager, zone, volume_resolution=volume_resolution, max_volume=max_volume, @@ -196,25 +144,27 @@ async def async_setup_entry( entities[zone] = zone_entity async_add_entities([zone_entity]) - receiver.callbacks.connect.append(connect_callback) - receiver.callbacks.update.append(update_callback) + manager.callbacks.connect.append(connect_callback) + manager.callbacks.update.append(update_callback) class OnkyoMediaPlayer(MediaPlayerEntity): - """Representation of an Onkyo Receiver Media Player (one per each zone).""" + """Onkyo Receiver Media Player (one per each zone).""" _attr_should_poll = False _supports_volume: bool = False - _supports_sound_mode: bool = False + # None means no technical possibility of support + _supports_sound_mode: bool | None = None _supports_audio_info: bool = False _supports_video_info: bool = False - _query_timer: asyncio.TimerHandle | None = None + + _query_task: asyncio.Task | None = None def __init__( self, - receiver: Receiver, - zone: str, + manager: ReceiverManager, + zone: Zone, *, volume_resolution: VolumeResolution, max_volume: float, @@ -222,80 +172,88 @@ class OnkyoMediaPlayer(MediaPlayerEntity): sound_modes: dict[ListeningMode, str], ) -> None: """Initialize the Onkyo Receiver.""" - self._receiver = receiver - name = receiver.model_name - identifier = receiver.identifier - self._attr_name = f"{name}{' ' + ZONES[zone] if zone != 'main' else ''}" - self._attr_unique_id = f"{identifier}_{zone}" - + self._manager = manager self._zone = zone + name = manager.info.model_name + identifier = manager.info.identifier + self._attr_name = f"{name}{' ' + ZONES[zone] if zone != Zone.MAIN else ''}" + self._attr_unique_id = f"{identifier}_{zone.value}" + self._volume_resolution = volume_resolution self._max_volume = max_volume - self._options_sources = sources - self._source_lib_mapping = _input_source_lib_mappings(zone) - self._rev_source_lib_mapping = _rev_input_source_lib_mappings(zone) + zone_sources = InputSource.for_zone(zone) self._source_mapping = { - key: value - for key, value in sources.items() - if key in self._source_lib_mapping + key: value for key, value in sources.items() if key in zone_sources } self._rev_source_mapping = { value: key for key, value in self._source_mapping.items() } - self._options_sound_modes = sound_modes - self._sound_mode_lib_mapping = _listening_mode_lib_mappings(zone) - self._rev_sound_mode_lib_mapping = _rev_listening_mode_lib_mappings(zone) + zone_sound_modes = ListeningMode.for_zone(zone) self._sound_mode_mapping = { - key: value - for key, value in sound_modes.items() - if key in self._sound_mode_lib_mapping + key: value for key, value in sound_modes.items() if key in zone_sound_modes } self._rev_sound_mode_mapping = { value: key for key, value in self._sound_mode_mapping.items() } + self._hdmi_output_mapping = LEGACY_HDMI_OUTPUT_MAPPING + self._rev_hdmi_output_mapping = LEGACY_REV_HDMI_OUTPUT_MAPPING + self._attr_source_list = list(self._rev_source_mapping) self._attr_sound_mode_list = list(self._rev_sound_mode_mapping) self._attr_supported_features = SUPPORTED_FEATURES_BASE - if zone == "main": + if zone == Zone.MAIN: self._attr_supported_features |= SUPPORTED_FEATURES_VOLUME self._supports_volume = True self._attr_supported_features |= MediaPlayerEntityFeature.SELECT_SOUND_MODE self._supports_sound_mode = True + elif Code.get_from_kind_zone(Kind.LISTENING_MODE, zone) is not None: + # To be detected later: + self._supports_sound_mode = False self._attr_extra_state_attributes = {} async def async_added_to_hass(self) -> None: """Entity has been added to hass.""" - self.backfill_state() + await self.backfill_state() async def async_will_remove_from_hass(self) -> None: """Cancel the query timer when the entity is removed.""" - if self._query_timer: - self._query_timer.cancel() - self._query_timer = None + if self._query_task: + self._query_task.cancel() + self._query_task = None - @callback - def _update_receiver(self, propname: str, value: Any) -> None: - """Update a property in the receiver.""" - self._receiver.conn.update_property(self._zone, propname, value) + async def backfill_state(self) -> None: + """Get the receiver to send all the info we care about. - @callback - def _query_receiver(self, propname: str) -> None: - """Cause the receiver to send an update about a property.""" - self._receiver.conn.query_property(self._zone, propname) + Usually run only on connect, as we can otherwise rely on the + receiver to keep us informed of changes. + """ + await self._manager.write(query.Power(self._zone)) + await self._manager.write(query.Volume(self._zone)) + await self._manager.write(query.Muting(self._zone)) + await self._manager.write(query.InputSource(self._zone)) + await self._manager.write(query.TunerPreset(self._zone)) + if self._supports_sound_mode is not None: + await self._manager.write(query.ListeningMode(self._zone)) + if self._zone == Zone.MAIN: + await self._manager.write(query.HDMIOutput()) + await self._manager.write(query.AudioInformation()) + await self._manager.write(query.VideoInformation()) async def async_turn_on(self) -> None: """Turn the media player on.""" - self._update_receiver("power", "on") + message = command.Power(self._zone, command.Power.Param.ON) + await self._manager.write(message) async def async_turn_off(self) -> None: """Turn the media player off.""" - self._update_receiver("power", "standby") + message = command.Power(self._zone, command.Power.Param.STANDBY) + await self._manager.write(message) async def async_set_volume_level(self, volume: float) -> None: """Set volume level, range 0..1. @@ -307,28 +265,30 @@ class OnkyoMediaPlayer(MediaPlayerEntity): scale for the receiver. """ # HA_VOL * (MAX VOL / 100) * VOL_RESOLUTION - self._update_receiver( - "volume", round(volume * (self._max_volume / 100) * self._volume_resolution) - ) + value = round(volume * (self._max_volume / 100) * self._volume_resolution) + message = command.Volume(self._zone, value) + await self._manager.write(message) async def async_volume_up(self) -> None: """Increase volume by 1 step.""" - self._update_receiver("volume", "level-up") + message = command.Volume(self._zone, command.Volume.Param.UP) + await self._manager.write(message) async def async_volume_down(self) -> None: """Decrease volume by 1 step.""" - self._update_receiver("volume", "level-down") + message = command.Volume(self._zone, command.Volume.Param.DOWN) + await self._manager.write(message) async def async_mute_volume(self, mute: bool) -> None: """Mute the volume.""" - self._update_receiver( - "audio-muting" if self._zone == "main" else "muting", - "on" if mute else "off", + message = command.Muting( + self._zone, command.Muting.Param.ON if mute else command.Muting.Param.OFF ) + await self._manager.write(message) async def async_select_source(self, source: str) -> None: """Select input source.""" - if not self.source_list or source not in self.source_list: + if source not in self._rev_source_mapping: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_source", @@ -338,15 +298,12 @@ class OnkyoMediaPlayer(MediaPlayerEntity): }, ) - source_lib = self._source_lib_mapping[self._rev_source_mapping[source]] - source_lib_single = _get_single_lib_value(source_lib) - self._update_receiver( - "input-selector" if self._zone == "main" else "selector", source_lib_single - ) + message = command.InputSource(self._zone, self._rev_source_mapping[source]) + await self._manager.write(message) async def async_select_sound_mode(self, sound_mode: str) -> None: """Select listening sound mode.""" - if not self.sound_mode_list or sound_mode not in self.sound_mode_list: + if sound_mode not in self._rev_sound_mode_mapping: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_sound_mode", @@ -356,197 +313,138 @@ class OnkyoMediaPlayer(MediaPlayerEntity): }, ) - sound_mode_lib = self._sound_mode_lib_mapping[ - self._rev_sound_mode_mapping[sound_mode] - ] - sound_mode_lib_single = _get_single_lib_value(sound_mode_lib) - self._update_receiver("listening-mode", sound_mode_lib_single) + message = command.ListeningMode( + self._zone, self._rev_sound_mode_mapping[sound_mode] + ) + await self._manager.write(message) async def async_select_output(self, hdmi_output: str) -> None: """Set hdmi-out.""" - self._update_receiver("hdmi-output-selector", hdmi_output) + message = command.HDMIOutput(self._rev_hdmi_output_mapping[hdmi_output]) + await self._manager.write(message) async def async_play_media( self, media_type: MediaType | str, media_id: str, **kwargs: Any ) -> None: """Play radio station by preset number.""" - if self.source is not None: - source = self._rev_source_mapping[self.source] - if media_type.lower() == "radio" and source in PLAYABLE_SOURCES: - self._update_receiver("preset", media_id) - - @callback - def backfill_state(self) -> None: - """Get the receiver to send all the info we care about. - - Usually run only on connect, as we can otherwise rely on the - receiver to keep us informed of changes. - """ - self._query_receiver("power") - self._query_receiver("volume") - self._query_receiver("preset") - if self._zone == "main": - self._query_receiver("hdmi-output-selector") - self._query_receiver("audio-muting") - self._query_receiver("input-selector") - self._query_receiver("listening-mode") - self._query_receiver("audio-information") - self._query_receiver("video-information") - else: - self._query_receiver("muting") - self._query_receiver("selector") - - @callback - def process_update(self, update: tuple[str, str, Any]) -> None: - """Store relevant updates so they can be queried later.""" - zone, command, value = update - if zone != self._zone: + if self.source is None: return - if command in ["system-power", "power"]: - if value == "on": + source = self._rev_source_mapping.get(self.source) + if media_type.lower() != "radio" or source not in PLAYABLE_SOURCES: + return + + message = command.TunerPreset(self._zone, int(media_id)) + await self._manager.write(message) + + def process_update(self, message: status.Known) -> None: + """Process update.""" + match message: + case status.Power(status.Power.Param.ON): self._attr_state = MediaPlayerState.ON - else: + case status.Power(status.Power.Param.STANDBY): self._attr_state = MediaPlayerState.OFF - self._attr_extra_state_attributes.pop(ATTR_AUDIO_INFORMATION, None) - self._attr_extra_state_attributes.pop(ATTR_VIDEO_INFORMATION, None) - self._attr_extra_state_attributes.pop(ATTR_PRESET, None) - self._attr_extra_state_attributes.pop(ATTR_VIDEO_OUT, None) - elif command in ["volume", "master-volume"] and value != "N/A": - if not self._supports_volume: - self._attr_supported_features |= SUPPORTED_FEATURES_VOLUME - self._supports_volume = True - # AMP_VOL / (VOL_RESOLUTION * (MAX_VOL / 100)) - volume_level: float = value / ( - self._volume_resolution * self._max_volume / 100 - ) - self._attr_volume_level = min(1, volume_level) - elif command in ["muting", "audio-muting"]: - self._attr_is_volume_muted = bool(value == "on") - elif command in ["selector", "input-selector"] and value != "N/A": - self._parse_source(value) - self._query_av_info_delayed() - elif command == "hdmi-output-selector": - self._attr_extra_state_attributes[ATTR_VIDEO_OUT] = ",".join(value) - elif command == "preset": - if self.source is not None and self.source.lower() == "radio": - self._attr_extra_state_attributes[ATTR_PRESET] = value - elif ATTR_PRESET in self._attr_extra_state_attributes: - del self._attr_extra_state_attributes[ATTR_PRESET] - elif command == "listening-mode" and value != "N/A": - if not self._supports_sound_mode: - self._attr_supported_features |= ( - MediaPlayerEntityFeature.SELECT_SOUND_MODE + + case status.Volume(volume): + if not self._supports_volume: + self._attr_supported_features |= SUPPORTED_FEATURES_VOLUME + self._supports_volume = True + # AMP_VOL / (VOL_RESOLUTION * (MAX_VOL / 100)) + volume_level: float = volume / ( + self._volume_resolution * self._max_volume / 100 ) - self._supports_sound_mode = True - self._parse_sound_mode(value) - self._query_av_info_delayed() - elif command == "audio-information": - self._supports_audio_info = True - self._parse_audio_information(value) - elif command == "video-information": - self._supports_video_info = True - self._parse_video_information(value) - elif command == "fl-display-information": - self._query_av_info_delayed() + self._attr_volume_level = min(1, volume_level) + + case status.Muting(muting): + self._attr_is_volume_muted = bool(muting == status.Muting.Param.ON) + + case status.InputSource(source): + if source in self._source_mapping: + self._attr_source = self._source_mapping[source] + else: + source_meaning = get_meaning(source) + _LOGGER.warning( + 'Input source "%s" for entity: %s is not in the list. Check integration options', + source_meaning, + self.entity_id, + ) + self._attr_source = source_meaning + + self._query_av_info_delayed() + + case status.ListeningMode(sound_mode): + if not self._supports_sound_mode: + self._attr_supported_features |= ( + MediaPlayerEntityFeature.SELECT_SOUND_MODE + ) + self._supports_sound_mode = True + + if sound_mode in self._sound_mode_mapping: + self._attr_sound_mode = self._sound_mode_mapping[sound_mode] + else: + sound_mode_meaning = get_meaning(sound_mode) + _LOGGER.warning( + 'Listening mode "%s" for entity: %s is not in the list. Check integration options', + sound_mode_meaning, + self.entity_id, + ) + self._attr_sound_mode = sound_mode_meaning + + self._query_av_info_delayed() + + case status.HDMIOutput(hdmi_output): + self._attr_extra_state_attributes[ATTR_VIDEO_OUT] = ( + self._hdmi_output_mapping[hdmi_output] + ) + self._query_av_info_delayed() + + case status.TunerPreset(preset): + self._attr_extra_state_attributes[ATTR_PRESET] = preset + + case status.AudioInformation(): + self._supports_audio_info = True + audio_information = {} + for item in AUDIO_INFORMATION_MAPPING: + item_value = getattr(message, item) + if item_value is not None: + audio_information[item] = item_value + self._attr_extra_state_attributes[ATTR_AUDIO_INFORMATION] = ( + audio_information + ) + + case status.VideoInformation(): + self._supports_video_info = True + video_information = {} + for item in VIDEO_INFORMATION_MAPPING: + item_value = getattr(message, item) + if item_value is not None: + video_information[item] = item_value + self._attr_extra_state_attributes[ATTR_VIDEO_INFORMATION] = ( + video_information + ) + + case status.FLDisplay(): + self._query_av_info_delayed() + + case status.NotAvailable(Kind.AUDIO_INFORMATION): + # Not available right now, but still supported + self._supports_audio_info = True + + case status.NotAvailable(Kind.VIDEO_INFORMATION): + # Not available right now, but still supported + self._supports_video_info = True self.async_write_ha_state() - @callback - def _parse_source(self, source_lib: LibValue) -> None: - source = self._rev_source_lib_mapping[source_lib] - if source in self._source_mapping: - self._attr_source = self._source_mapping[source] - return - - source_meaning = source.value_meaning - - if source not in self._options_sources: - _LOGGER.warning( - 'Input source "%s" for entity: %s is not in the list. Check integration options', - source_meaning, - self.entity_id, - ) - else: - _LOGGER.error( - 'Input source "%s" is invalid for entity: %s', - source_meaning, - self.entity_id, - ) - - self._attr_source = source_meaning - - @callback - def _parse_sound_mode(self, mode_lib: LibValue) -> None: - sound_mode = self._rev_sound_mode_lib_mapping[mode_lib] - if sound_mode in self._sound_mode_mapping: - self._attr_sound_mode = self._sound_mode_mapping[sound_mode] - return - - sound_mode_meaning = sound_mode.value_meaning - - if sound_mode not in self._options_sound_modes: - _LOGGER.warning( - 'Listening mode "%s" for entity: %s is not in the list. Check integration options', - sound_mode_meaning, - self.entity_id, - ) - else: - _LOGGER.error( - 'Listening mode "%s" is invalid for entity: %s', - sound_mode_meaning, - self.entity_id, - ) - - self._attr_sound_mode = sound_mode_meaning - - @callback - def _parse_audio_information( - self, audio_information: tuple[str] | Literal["N/A"] - ) -> None: - # If audio information is not available, N/A is returned, - # so only update the audio information, when it is not N/A. - if audio_information == "N/A": - self._attr_extra_state_attributes.pop(ATTR_AUDIO_INFORMATION, None) - return - - self._attr_extra_state_attributes[ATTR_AUDIO_INFORMATION] = { - name: value - for name, value in zip( - AUDIO_INFORMATION_MAPPING, audio_information, strict=False - ) - if len(value) > 0 - } - - @callback - def _parse_video_information( - self, video_information: tuple[str] | Literal["N/A"] - ) -> None: - # If video information is not available, N/A is returned, - # so only update the video information, when it is not N/A. - if video_information == "N/A": - self._attr_extra_state_attributes.pop(ATTR_VIDEO_INFORMATION, None) - return - - self._attr_extra_state_attributes[ATTR_VIDEO_INFORMATION] = { - name: value - for name, value in zip( - VIDEO_INFORMATION_MAPPING, video_information, strict=False - ) - if len(value) > 0 - } - def _query_av_info_delayed(self) -> None: - if self._zone == "main" and not self._query_timer: + if self._zone == Zone.MAIN and not self._query_task: - @callback - def _query_av_info() -> None: + async def _query_av_info() -> None: + await asyncio.sleep(AUDIO_VIDEO_INFORMATION_UPDATE_WAIT_TIME) if self._supports_audio_info: - self._query_receiver("audio-information") + await self._manager.write(query.AudioInformation()) if self._supports_video_info: - self._query_receiver("video-information") - self._query_timer = None + await self._manager.write(query.VideoInformation()) + self._query_task = None - self._query_timer = self.hass.loop.call_later( - AUDIO_VIDEO_INFORMATION_UPDATE_WAIT_TIME, _query_av_info - ) + self._query_task = asyncio.create_task(_query_av_info()) diff --git a/homeassistant/components/onkyo/quality_scale.yaml b/homeassistant/components/onkyo/quality_scale.yaml index 4b9fbe7c019..caf0d33fafc 100644 --- a/homeassistant/components/onkyo/quality_scale.yaml +++ b/homeassistant/components/onkyo/quality_scale.yaml @@ -77,7 +77,4 @@ rules: status: exempt comment: | This integration is not making any HTTP requests. - strict-typing: - status: todo - comment: | - The library is not fully typed yet. + strict-typing: done diff --git a/homeassistant/components/onkyo/receiver.py b/homeassistant/components/onkyo/receiver.py index cc6cbbc95fb..e4fe8bc6630 100644 --- a/homeassistant/components/onkyo/receiver.py +++ b/homeassistant/components/onkyo/receiver.py @@ -3,149 +3,149 @@ from __future__ import annotations import asyncio -from collections.abc import Callable, Iterable +from collections.abc import Awaitable, Callable, Iterable import contextlib from dataclasses import dataclass, field import logging -from typing import Any +from typing import TYPE_CHECKING -import pyeiscp +import aioonkyo +from aioonkyo import Instruction, Receiver, ReceiverInfo, Status, connect, query + +from homeassistant.components import network +from homeassistant.core import HomeAssistant from .const import DEVICE_DISCOVERY_TIMEOUT, DEVICE_INTERVIEW_TIMEOUT, ZONES +if TYPE_CHECKING: + from . import OnkyoConfigEntry + _LOGGER = logging.getLogger(__name__) @dataclass class Callbacks: - """Onkyo Receiver Callbacks.""" + """Receiver callbacks.""" - connect: list[Callable[[Receiver], None]] = field(default_factory=list) - update: list[Callable[[Receiver, tuple[str, str, Any]], None]] = field( - default_factory=list - ) + connect: list[Callable[[bool], Awaitable[None]]] = field(default_factory=list) + update: list[Callable[[Status], Awaitable[None]]] = field(default_factory=list) + + def clear(self) -> None: + """Clear all callbacks.""" + self.connect.clear() + self.update.clear() -@dataclass -class Receiver: - """Onkyo receiver.""" +class ReceiverManager: + """Receiver manager.""" - conn: pyeiscp.Connection - model_name: str - identifier: str - host: str - first_connect: bool = True - callbacks: Callbacks = field(default_factory=Callbacks) + hass: HomeAssistant + entry: OnkyoConfigEntry + info: ReceiverInfo + receiver: Receiver | None = None + callbacks: Callbacks - @classmethod - async def async_create(cls, info: ReceiverInfo) -> Receiver: - """Set up Onkyo Receiver.""" + _started: asyncio.Event - receiver: Receiver | None = None + def __init__( + self, hass: HomeAssistant, entry: OnkyoConfigEntry, info: ReceiverInfo + ) -> None: + """Init receiver manager.""" + self.hass = hass + self.entry = entry + self.info = info + self.callbacks = Callbacks() + self._started = asyncio.Event() - def on_connect(_origin: str) -> None: - assert receiver is not None - receiver.on_connect() + async def start(self) -> Awaitable[None] | None: + """Start the receiver manager run. - def on_update(message: tuple[str, str, Any], _origin: str) -> None: - assert receiver is not None - receiver.on_update(message) - - _LOGGER.debug("Creating receiver: %s (%s)", info.model_name, info.host) - - connection = await pyeiscp.Connection.create( - host=info.host, - port=info.port, - connect_callback=on_connect, - update_callback=on_update, - auto_connect=False, + Returns `None`, if everything went fine. + Returns an awaitable with exception set, if something went wrong. + """ + manager_task = self.entry.async_create_background_task( + self.hass, self._run(), "run_connection" ) - - return ( - receiver := cls( - conn=connection, - model_name=info.model_name, - identifier=info.identifier, - host=info.host, - ) + wait_for_started_task = asyncio.create_task(self._started.wait()) + done, _ = await asyncio.wait( + (manager_task, wait_for_started_task), return_when=asyncio.FIRST_COMPLETED ) + if manager_task in done: + # Something went wrong, so let's return the manager task, + # so that it can be awaited to error out + return manager_task - def on_connect(self) -> None: + return None + + async def _run(self) -> None: + """Run the connection to the receiver.""" + reconnect = False + while True: + try: + async with connect(self.info, retry=reconnect) as self.receiver: + if not reconnect: + self._started.set() + else: + _LOGGER.info("Reconnected: %s", self.info) + + await self.on_connect(reconnect=reconnect) + + while message := await self.receiver.read(): + await self.on_update(message) + + reconnect = True + + finally: + _LOGGER.info("Disconnected: %s", self.info) + + async def on_connect(self, reconnect: bool) -> None: """Receiver (re)connected.""" - _LOGGER.debug("Receiver (re)connected: %s (%s)", self.model_name, self.host) # Discover what zones are available for the receiver by querying the power. # If we get a response for the specific zone, it means it is available. for zone in ZONES: - self.conn.query_property(zone, "power") + await self.write(query.Power(zone)) for callback in self.callbacks.connect: - callback(self) + await callback(reconnect) - self.first_connect = False - - def on_update(self, message: tuple[str, str, Any]) -> None: + async def on_update(self, message: Status) -> None: """Process new message from the receiver.""" - _LOGGER.debug("Received update callback from %s: %s", self.model_name, message) for callback in self.callbacks.update: - callback(self, message) + await callback(message) + async def write(self, message: Instruction) -> None: + """Write message to the receiver.""" + assert self.receiver is not None + await self.receiver.write(message) -@dataclass -class ReceiverInfo: - """Onkyo receiver information.""" - - host: str - port: int - model_name: str - identifier: str + def start_unloading(self) -> None: + """Start unloading.""" + self.callbacks.clear() async def async_interview(host: str) -> ReceiverInfo | None: - """Interview Onkyo Receiver.""" - _LOGGER.debug("Interviewing receiver: %s", host) - - receiver_info: ReceiverInfo | None = None - - event = asyncio.Event() - - async def _callback(conn: pyeiscp.Connection) -> None: - """Receiver interviewed, connection not yet active.""" - nonlocal receiver_info - if receiver_info is None: - info = ReceiverInfo(host, conn.port, conn.name, conn.identifier) - _LOGGER.debug("Receiver interviewed: %s (%s)", info.model_name, info.host) - receiver_info = info - event.set() - - timeout = DEVICE_INTERVIEW_TIMEOUT - - await pyeiscp.Connection.discover( - host=host, discovery_callback=_callback, timeout=timeout - ) - + """Interview the receiver.""" + info: ReceiverInfo | None = None with contextlib.suppress(asyncio.TimeoutError): - await asyncio.wait_for(event.wait(), timeout) - - return receiver_info + async with asyncio.timeout(DEVICE_INTERVIEW_TIMEOUT): + info = await aioonkyo.interview(host) + return info -async def async_discover() -> Iterable[ReceiverInfo]: - """Discover Onkyo Receivers.""" - _LOGGER.debug("Discovering receivers") +async def async_discover(hass: HomeAssistant) -> Iterable[ReceiverInfo]: + """Discover receivers.""" + all_infos: dict[str, ReceiverInfo] = {} - receiver_infos: list[ReceiverInfo] = [] + async def collect_infos(address: str) -> None: + with contextlib.suppress(asyncio.TimeoutError): + async with asyncio.timeout(DEVICE_DISCOVERY_TIMEOUT): + async for info in aioonkyo.discover(address): + all_infos.setdefault(info.identifier, info) - async def _callback(conn: pyeiscp.Connection) -> None: - """Receiver discovered, connection not yet active.""" - info = ReceiverInfo(conn.host, conn.port, conn.name, conn.identifier) - _LOGGER.debug("Receiver discovered: %s (%s)", info.model_name, info.host) - receiver_infos.append(info) + broadcast_addrs = await network.async_get_ipv4_broadcast_addresses(hass) + tasks = [collect_infos(str(address)) for address in broadcast_addrs] - timeout = DEVICE_DISCOVERY_TIMEOUT + await asyncio.gather(*tasks) - await pyeiscp.Connection.discover(discovery_callback=_callback, timeout=timeout) - - await asyncio.sleep(timeout) - - return receiver_infos + return all_infos.values() diff --git a/homeassistant/components/onkyo/services.py b/homeassistant/components/onkyo/services.py index 26a22523a0e..cfd246d9af7 100644 --- a/homeassistant/components/onkyo/services.py +++ b/homeassistant/components/onkyo/services.py @@ -4,6 +4,7 @@ from __future__ import annotations from typing import TYPE_CHECKING +from aioonkyo import Zone import voluptuous as vol from homeassistant.components.media_player import DOMAIN as MEDIA_PLAYER_DOMAIN @@ -12,29 +13,18 @@ from homeassistant.core import HomeAssistant, ServiceCall, callback from homeassistant.helpers import config_validation as cv from homeassistant.util.hass_dict import HassKey -from .const import DOMAIN +from .const import DOMAIN, LEGACY_REV_HDMI_OUTPUT_MAPPING if TYPE_CHECKING: from .media_player import OnkyoMediaPlayer -DATA_MP_ENTITIES: HassKey[dict[str, dict[str, OnkyoMediaPlayer]]] = HassKey(DOMAIN) +DATA_MP_ENTITIES: HassKey[dict[str, dict[Zone, OnkyoMediaPlayer]]] = HassKey(DOMAIN) ATTR_HDMI_OUTPUT = "hdmi_output" -ACCEPTED_VALUES = [ - "no", - "analog", - "yes", - "out", - "out-sub", - "sub", - "hdbaset", - "both", - "up", -] ONKYO_SELECT_OUTPUT_SCHEMA = vol.Schema( { vol.Required(ATTR_ENTITY_ID): cv.entity_ids, - vol.Required(ATTR_HDMI_OUTPUT): vol.In(ACCEPTED_VALUES), + vol.Required(ATTR_HDMI_OUTPUT): vol.In(LEGACY_REV_HDMI_OUTPUT_MAPPING), } ) SERVICE_SELECT_HDMI_OUTPUT = "onkyo_select_hdmi_output" diff --git a/homeassistant/components/onkyo/util.py b/homeassistant/components/onkyo/util.py new file mode 100644 index 00000000000..bd2cc8a4c7b --- /dev/null +++ b/homeassistant/components/onkyo/util.py @@ -0,0 +1,8 @@ +"""Utils for Onkyo.""" + +from .const import InputSource, ListeningMode + + +def get_meaning(param: InputSource | ListeningMode) -> str: + """Get param meaning.""" + return " ··· ".join(param.meanings) diff --git a/requirements_all.txt b/requirements_all.txt index 60e64a1ad27..513422df915 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -330,6 +330,9 @@ aiontfy==0.5.3 # homeassistant.components.nut aionut==4.3.4 +# homeassistant.components.onkyo +aioonkyo==0.2.0 + # homeassistant.components.openexchangerates aioopenexchangerates==0.6.8 @@ -1956,9 +1959,6 @@ pyefergy==22.5.0 # homeassistant.components.energenie_power_sockets pyegps==0.2.5 -# homeassistant.components.onkyo -pyeiscp==0.0.7 - # homeassistant.components.emoncms pyemoncms==0.1.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 20c826b73e9..4fac0aba573 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -312,6 +312,9 @@ aiontfy==0.5.3 # homeassistant.components.nut aionut==4.3.4 +# homeassistant.components.onkyo +aioonkyo==0.2.0 + # homeassistant.components.openexchangerates aioopenexchangerates==0.6.8 @@ -1631,9 +1634,6 @@ pyefergy==22.5.0 # homeassistant.components.energenie_power_sockets pyegps==0.2.5 -# homeassistant.components.onkyo -pyeiscp==0.0.7 - # homeassistant.components.emoncms pyemoncms==0.1.1 diff --git a/tests/components/onkyo/__init__.py b/tests/components/onkyo/__init__.py index 689711888d8..f8580c2b257 100644 --- a/tests/components/onkyo/__init__.py +++ b/tests/components/onkyo/__init__.py @@ -1,90 +1,71 @@ """Tests for the Onkyo integration.""" -from unittest.mock import AsyncMock, Mock, patch +from collections.abc import Generator, Iterable +from contextlib import contextmanager +from unittest.mock import MagicMock, patch + +from aioonkyo import ReceiverInfo -from homeassistant.components.onkyo.receiver import Receiver, ReceiverInfo -from homeassistant.const import CONF_HOST from homeassistant.core import HomeAssistant from tests.common import MockConfigEntry +RECEIVER_INFO = ReceiverInfo( + host="192.168.0.101", + ip="192.168.0.101", + model_name="TX-NR7100", + identifier="0009B0123456", +) -def create_receiver_info(id: int) -> ReceiverInfo: - """Create an empty receiver info object for testing.""" - return ReceiverInfo( - host=f"host {id}", - port=id, - model_name=f"type {id}", - identifier=f"id{id}", - ) +RECEIVER_INFO_2 = ReceiverInfo( + host="192.168.0.102", + ip="192.168.0.102", + model_name="TX-RZ50", + identifier="0009B0ABCDEF", +) -def create_connection(id: int) -> Mock: - """Create an mock connection object for testing.""" - connection = Mock() - connection.host = f"host {id}" - connection.port = 0 - connection.name = f"type {id}" - connection.identifier = f"id{id}" - return connection +@contextmanager +def mock_discovery(receiver_infos: Iterable[ReceiverInfo] | None) -> Generator[None]: + """Mock discovery functions.""" + async def get_info(host: str) -> ReceiverInfo | None: + """Get receiver info by host.""" + for info in receiver_infos: + if info.host == host: + return info + return None -def create_config_entry_from_info(info: ReceiverInfo) -> MockConfigEntry: - """Create a config entry from receiver info.""" - data = {CONF_HOST: info.host} - options = { - "volume_resolution": 80, - "max_volume": 100, - "input_sources": {"12": "tv"}, - "listening_modes": {"00": "stereo"}, - } + def get_infos(host: str) -> MagicMock: + """Get receiver infos from broadcast.""" + discover_mock = MagicMock() + discover_mock.__aiter__.return_value = receiver_infos + return discover_mock - return MockConfigEntry( - data=data, - options=options, - title=info.model_name, - domain="onkyo", - unique_id=info.identifier, - ) - - -def create_empty_config_entry() -> MockConfigEntry: - """Create an empty config entry for use in unit tests.""" - data = {CONF_HOST: ""} - options = { - "volume_resolution": 80, - "max_volume": 100, - "input_sources": {"12": "tv"}, - "listening_modes": {"00": "stereo"}, - } - - return MockConfigEntry( - data=data, - options=options, - title="Unit test Onkyo", - domain="onkyo", - unique_id="onkyo_unique_id", - ) - - -async def setup_integration( - hass: HomeAssistant, config_entry: MockConfigEntry, receiver_info: ReceiverInfo -) -> None: - """Fixture for setting up the component.""" - - config_entry.add_to_hass(hass) - - mock_receiver = AsyncMock() - mock_receiver.conn.close = Mock() - mock_receiver.callbacks.connect = Mock() - mock_receiver.callbacks.update = Mock() + discover_kwargs = {} + interview_kwargs = {} + if receiver_infos is None: + discover_kwargs["side_effect"] = OSError + interview_kwargs["side_effect"] = OSError + else: + discover_kwargs["new"] = get_infos + interview_kwargs["new"] = get_info with ( patch( - "homeassistant.components.onkyo.async_interview", - return_value=receiver_info, + "homeassistant.components.onkyo.receiver.aioonkyo.discover", + **discover_kwargs, + ), + patch( + "homeassistant.components.onkyo.receiver.aioonkyo.interview", + **interview_kwargs, ), - patch.object(Receiver, "async_create", return_value=mock_receiver), ): - await hass.config_entries.async_setup(config_entry.entry_id) - await hass.async_block_till_done() + yield + + +async def setup_integration(hass: HomeAssistant, config_entry: MockConfigEntry) -> None: + """Set up the component.""" + config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(config_entry.entry_id) + await hass.async_block_till_done() diff --git a/tests/components/onkyo/conftest.py b/tests/components/onkyo/conftest.py index abbe39dd966..c6459a2b1f2 100644 --- a/tests/components/onkyo/conftest.py +++ b/tests/components/onkyo/conftest.py @@ -1,74 +1,181 @@ -"""Configure tests for the Onkyo integration.""" +"""Common fixtures for the Onkyo tests.""" -from unittest.mock import patch +import asyncio +from collections.abc import Generator +from unittest.mock import AsyncMock, patch +from aioonkyo import Code, Instruction, Kind, Receiver, Status, Zone, status import pytest from homeassistant.components.onkyo.const import DOMAIN +from homeassistant.const import CONF_HOST -from . import create_connection +from . import RECEIVER_INFO, mock_discovery from tests.common import MockConfigEntry -@pytest.fixture(name="config_entry") +@pytest.fixture(autouse=True) +def mock_default_discovery() -> Generator[None]: + """Mock the discovery functions with default info.""" + with ( + patch.multiple( + "homeassistant.components.onkyo.receiver", + DEVICE_INTERVIEW_TIMEOUT=1, + DEVICE_DISCOVERY_TIMEOUT=1, + ), + mock_discovery([RECEIVER_INFO]), + ): + yield + + +@pytest.fixture +def mock_setup_entry() -> Generator[AsyncMock]: + """Mock integration setup.""" + with patch( + "homeassistant.components.onkyo.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + yield mock_setup_entry + + +@pytest.fixture +def mock_connect() -> Generator[AsyncMock]: + """Mock an Onkyo connect.""" + with patch( + "homeassistant.components.onkyo.receiver.connect", + ) as connect: + yield connect.return_value.__aenter__ + + +INITIAL_MESSAGES = [ + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.MAIN), None, status.Power.Param.ON + ), + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.ZONE2), None, status.Power.Param.ON + ), + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.ZONE3), None, status.Power.Param.STANDBY + ), + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.MAIN), None, status.Power.Param.ON + ), + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.ZONE2), None, status.Power.Param.ON + ), + status.Power( + Code.from_kind_zone(Kind.POWER, Zone.ZONE3), None, status.Power.Param.STANDBY + ), + status.Volume(Code.from_kind_zone(Kind.VOLUME, Zone.ZONE2), None, 50), + status.Muting( + Code.from_kind_zone(Kind.MUTING, Zone.MAIN), None, status.Muting.Param.OFF + ), + status.InputSource( + Code.from_kind_zone(Kind.INPUT_SOURCE, Zone.MAIN), + None, + status.InputSource.Param("24"), + ), + status.InputSource( + Code.from_kind_zone(Kind.INPUT_SOURCE, Zone.ZONE2), + None, + status.InputSource.Param("00"), + ), + status.ListeningMode( + Code.from_kind_zone(Kind.LISTENING_MODE, Zone.MAIN), + None, + status.ListeningMode.Param("01"), + ), + status.ListeningMode( + Code.from_kind_zone(Kind.LISTENING_MODE, Zone.ZONE2), + None, + status.ListeningMode.Param("00"), + ), + status.HDMIOutput( + Code.from_kind_zone(Kind.HDMI_OUTPUT, Zone.MAIN), + None, + status.HDMIOutput.Param.MAIN, + ), + status.TunerPreset(Code.from_kind_zone(Kind.TUNER_PRESET, Zone.MAIN), None, 1), + status.AudioInformation( + Code.from_kind_zone(Kind.AUDIO_INFORMATION, Zone.MAIN), + None, + auto_phase_control_phase="Normal", + ), + status.VideoInformation( + Code.from_kind_zone(Kind.VIDEO_INFORMATION, Zone.MAIN), + None, + input_color_depth="24bit", + ), + status.FLDisplay(Code.from_kind_zone(Kind.FL_DISPLAY, Zone.MAIN), None, "LALALA"), + status.NotAvailable( + Code.from_kind_zone(Kind.AUDIO_INFORMATION, Zone.MAIN), + None, + Kind.AUDIO_INFORMATION, + ), + status.NotAvailable( + Code.from_kind_zone(Kind.VIDEO_INFORMATION, Zone.MAIN), + None, + Kind.VIDEO_INFORMATION, + ), + status.Raw(None, None), +] + + +@pytest.fixture +def read_queue() -> asyncio.Queue[Status | None]: + """Read messages queue.""" + return asyncio.Queue() + + +@pytest.fixture +def writes() -> list[Instruction]: + """Written messages.""" + return [] + + +@pytest.fixture +def mock_receiver( + mock_connect: AsyncMock, + read_queue: asyncio.Queue[Status | None], + writes: list[Instruction], +) -> AsyncMock: + """Mock an Onkyo receiver.""" + receiver_class = AsyncMock(Receiver, auto_spec=True) + receiver = receiver_class.return_value + + for message in INITIAL_MESSAGES: + read_queue.put_nowait(message) + + async def read() -> Status: + return await read_queue.get() + + async def write(message: Instruction) -> None: + writes.append(message) + + receiver.read = read + receiver.write = write + + mock_connect.return_value = receiver + + return receiver + + +@pytest.fixture def mock_config_entry() -> MockConfigEntry: - """Create Onkyo entry in Home Assistant.""" + """Mock a config entry.""" + data = {CONF_HOST: RECEIVER_INFO.host} + options = { + "volume_resolution": 80, + "max_volume": 100, + "input_sources": {"12": "TV", "24": "FM Radio"}, + "listening_modes": {"00": "Stereo", "04": "THX"}, + } + return MockConfigEntry( domain=DOMAIN, - title="Onkyo", - data={}, + title=RECEIVER_INFO.model_name, + unique_id=RECEIVER_INFO.identifier, + data=data, + options=options, ) - - -@pytest.fixture(autouse=True) -def patch_timeouts(): - """Patch timeouts to avoid tests waiting.""" - with patch.multiple( - "homeassistant.components.onkyo.receiver", - DEVICE_INTERVIEW_TIMEOUT=0, - DEVICE_DISCOVERY_TIMEOUT=0, - ): - yield - - -@pytest.fixture -async def default_mock_discovery(): - """Mock discovery with a single device.""" - - async def mock_discover(host=None, discovery_callback=None, timeout=0): - await discovery_callback(create_connection(1)) - - with patch( - "homeassistant.components.onkyo.receiver.pyeiscp.Connection.discover", - new=mock_discover, - ): - yield - - -@pytest.fixture -async def stub_mock_discovery(): - """Mock discovery with no devices.""" - - async def mock_discover(host=None, discovery_callback=None, timeout=0): - pass - - with patch( - "homeassistant.components.onkyo.receiver.pyeiscp.Connection.discover", - new=mock_discover, - ): - yield - - -@pytest.fixture -async def empty_mock_discovery(): - """Mock discovery with an empty connection.""" - - async def mock_discover(host=None, discovery_callback=None, timeout=0): - await discovery_callback(None) - - with patch( - "homeassistant.components.onkyo.receiver.pyeiscp.Connection.discover", - new=mock_discover, - ): - yield diff --git a/tests/components/onkyo/snapshots/test_media_player.ambr b/tests/components/onkyo/snapshots/test_media_player.ambr new file mode 100644 index 00000000000..1504952a86d --- /dev/null +++ b/tests/components/onkyo/snapshots/test_media_player.ambr @@ -0,0 +1,203 @@ +# serializer version: 1 +# name: test_entities[media_player.tx_nr7100-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': dict({ + 'sound_mode_list': list([ + 'Stereo', + 'THX', + ]), + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + }), + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'media_player', + 'entity_category': None, + 'entity_id': 'media_player.tx_nr7100', + 'has_entity_name': False, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'TX-NR7100', + 'platform': 'onkyo', + 'previous_unique_id': None, + 'suggested_object_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': '0009B0123456_main', + 'unit_of_measurement': None, + }) +# --- +# name: test_entities[media_player.tx_nr7100-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'audio_information': dict({ + 'auto_phase_control_phase': 'Normal', + }), + 'friendly_name': 'TX-NR7100', + 'is_volume_muted': False, + 'preset': 1, + 'sound_mode': 'DIRECT', + 'sound_mode_list': list([ + 'Stereo', + 'THX', + ]), + 'source': 'FM Radio', + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + 'supported_features': , + 'video_information': dict({ + 'input_color_depth': '24bit', + }), + 'video_out': 'yes,out', + }), + 'context': , + 'entity_id': 'media_player.tx_nr7100', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'on', + }) +# --- +# name: test_entities[media_player.tx_nr7100_zone_2-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': dict({ + 'sound_mode_list': list([ + 'Stereo', + ]), + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + }), + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'media_player', + 'entity_category': None, + 'entity_id': 'media_player.tx_nr7100_zone_2', + 'has_entity_name': False, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'TX-NR7100 Zone 2', + 'platform': 'onkyo', + 'previous_unique_id': None, + 'suggested_object_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': '0009B0123456_zone2', + 'unit_of_measurement': None, + }) +# --- +# name: test_entities[media_player.tx_nr7100_zone_2-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'friendly_name': 'TX-NR7100 Zone 2', + 'sound_mode': 'Stereo', + 'sound_mode_list': list([ + 'Stereo', + ]), + 'source': 'VIDEO1 ··· VCR/DVR ··· STB/DVR', + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + 'supported_features': , + 'volume_level': 0.625, + }), + 'context': , + 'entity_id': 'media_player.tx_nr7100_zone_2', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'on', + }) +# --- +# name: test_entities[media_player.tx_nr7100_zone_3-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': dict({ + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + }), + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'media_player', + 'entity_category': None, + 'entity_id': 'media_player.tx_nr7100_zone_3', + 'has_entity_name': False, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'TX-NR7100 Zone 3', + 'platform': 'onkyo', + 'previous_unique_id': None, + 'suggested_object_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': '0009B0123456_zone3', + 'unit_of_measurement': None, + }) +# --- +# name: test_entities[media_player.tx_nr7100_zone_3-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'friendly_name': 'TX-NR7100 Zone 3', + 'source_list': list([ + 'TV', + 'FM Radio', + ]), + 'supported_features': , + }), + 'context': , + 'entity_id': 'media_player.tx_nr7100_zone_3', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'off', + }) +# --- diff --git a/tests/components/onkyo/test_config_flow.py b/tests/components/onkyo/test_config_flow.py index 92a4a34e8fb..df10e266982 100644 --- a/tests/components/onkyo/test_config_flow.py +++ b/tests/components/onkyo/test_config_flow.py @@ -1,11 +1,9 @@ """Test Onkyo config flow.""" -from unittest.mock import patch - +from aioonkyo import ReceiverInfo import pytest from homeassistant import config_entries -from homeassistant.components.onkyo.config_flow import OnkyoConfigFlow from homeassistant.components.onkyo.const import ( DOMAIN, OPTION_INPUT_SOURCES, @@ -23,17 +21,15 @@ from homeassistant.helpers.service_info.ssdp import ( SsdpServiceInfo, ) -from . import ( - create_config_entry_from_info, - create_connection, - create_empty_config_entry, - create_receiver_info, - setup_integration, -) +from . import RECEIVER_INFO, RECEIVER_INFO_2, mock_discovery, setup_integration from tests.common import MockConfigEntry +def _entry_title(receiver_info: ReceiverInfo) -> str: + return f"{receiver_info.model_name} ({receiver_info.host})" + + async def test_user_initial_menu(hass: HomeAssistant) -> None: """Test initial menu.""" init_result = await hass.config_entries.flow.async_init( @@ -46,7 +42,7 @@ async def test_user_initial_menu(hass: HomeAssistant) -> None: assert not set(init_result["menu_options"]) ^ {"manual", "eiscp_discovery"} -async def test_manual_valid_host(hass: HomeAssistant, default_mock_discovery) -> None: +async def test_manual_valid_host(hass: HomeAssistant) -> None: """Test valid host entered.""" init_result = await hass.config_entries.flow.async_init( DOMAIN, @@ -60,14 +56,16 @@ async def test_manual_valid_host(hass: HomeAssistant, default_mock_discovery) -> select_result = await hass.config_entries.flow.async_configure( form_result["flow_id"], - user_input={CONF_HOST: "host 1"}, + user_input={CONF_HOST: RECEIVER_INFO.host}, ) assert select_result["step_id"] == "configure_receiver" - assert select_result["description_placeholders"]["name"] == "type 1 (host 1)" + assert select_result["description_placeholders"]["name"] == _entry_title( + RECEIVER_INFO + ) -async def test_manual_invalid_host(hass: HomeAssistant, stub_mock_discovery) -> None: +async def test_manual_invalid_host(hass: HomeAssistant) -> None: """Test invalid host entered.""" init_result = await hass.config_entries.flow.async_init( DOMAIN, @@ -79,18 +77,17 @@ async def test_manual_invalid_host(hass: HomeAssistant, stub_mock_discovery) -> {"next_step_id": "manual"}, ) - host_result = await hass.config_entries.flow.async_configure( - form_result["flow_id"], - user_input={CONF_HOST: "sample-host-name"}, - ) + with mock_discovery([]): + host_result = await hass.config_entries.flow.async_configure( + form_result["flow_id"], + user_input={CONF_HOST: "sample-host-name"}, + ) assert host_result["step_id"] == "manual" assert host_result["errors"]["base"] == "cannot_connect" -async def test_manual_valid_host_unexpected_error( - hass: HomeAssistant, empty_mock_discovery -) -> None: +async def test_manual_valid_host_unexpected_error(hass: HomeAssistant) -> None: """Test valid host entered.""" init_result = await hass.config_entries.flow.async_init( @@ -103,112 +100,102 @@ async def test_manual_valid_host_unexpected_error( {"next_step_id": "manual"}, ) - host_result = await hass.config_entries.flow.async_configure( - form_result["flow_id"], - user_input={CONF_HOST: "sample-host-name"}, - ) + with mock_discovery(None): + host_result = await hass.config_entries.flow.async_configure( + form_result["flow_id"], + user_input={CONF_HOST: "sample-host-name"}, + ) assert host_result["step_id"] == "manual" assert host_result["errors"]["base"] == "unknown" -async def test_discovery_and_no_devices_discovered( - hass: HomeAssistant, stub_mock_discovery -) -> None: - """Test initial menu.""" - init_result = await hass.config_entries.flow.async_init( +async def test_eiscp_discovery_no_devices_found(hass: HomeAssistant) -> None: + """Test eiscp discovery with no devices found.""" + result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": SOURCE_USER}, ) - form_result = await hass.config_entries.flow.async_configure( - init_result["flow_id"], - {"next_step_id": "eiscp_discovery"}, - ) - - assert form_result["type"] is FlowResultType.ABORT - assert form_result["reason"] == "no_devices_found" - - -async def test_discovery_with_exception( - hass: HomeAssistant, empty_mock_discovery -) -> None: - """Test discovery which throws an unexpected exception.""" - init_result = await hass.config_entries.flow.async_init( - DOMAIN, - context={"source": SOURCE_USER}, - ) - - form_result = await hass.config_entries.flow.async_configure( - init_result["flow_id"], - {"next_step_id": "eiscp_discovery"}, - ) - - assert form_result["type"] is FlowResultType.ABORT - assert form_result["reason"] == "unknown" - - -async def test_discovery_with_new_and_existing_found(hass: HomeAssistant) -> None: - """Test discovery with a new and an existing entry.""" - init_result = await hass.config_entries.flow.async_init( - DOMAIN, - context={"source": SOURCE_USER}, - ) - - async def mock_discover(discovery_callback, timeout): - await discovery_callback(create_connection(1)) - await discovery_callback(create_connection(2)) - - with ( - patch("pyeiscp.Connection.discover", new=mock_discover), - # Fake it like the first entry was already added - patch.object(OnkyoConfigFlow, "_async_current_ids", return_value=["id1"]), - ): - form_result = await hass.config_entries.flow.async_configure( - init_result["flow_id"], + with mock_discovery([]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], {"next_step_id": "eiscp_discovery"}, ) - assert form_result["type"] is FlowResultType.FORM + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "no_devices_found" - assert form_result["data_schema"] is not None - schema = form_result["data_schema"].schema + +async def test_eiscp_discovery_unexpected_exception(hass: HomeAssistant) -> None: + """Test eiscp discovery with an unexpected exception.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + ) + + with mock_discovery(None): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {"next_step_id": "eiscp_discovery"}, + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "unknown" + + +@pytest.mark.usefixtures("mock_setup_entry") +async def test_eiscp_discovery( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, +) -> None: + """Test eiscp discovery.""" + mock_config_entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + ) + + with mock_discovery([RECEIVER_INFO, RECEIVER_INFO_2]): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {"next_step_id": "eiscp_discovery"}, + ) + + assert result["type"] is FlowResultType.FORM + + assert result["data_schema"] is not None + schema = result["data_schema"].schema container = schema["device"].container - assert container == {"id2": "type 2 (host 2)"} + assert container == {RECEIVER_INFO_2.identifier: _entry_title(RECEIVER_INFO_2)} - -async def test_discovery_with_one_selected(hass: HomeAssistant) -> None: - """Test discovery after a selection.""" - init_result = await hass.config_entries.flow.async_init( - DOMAIN, - context={"source": SOURCE_USER}, + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={"device": RECEIVER_INFO_2.identifier}, ) - async def mock_discover(discovery_callback, timeout): - await discovery_callback(create_connection(42)) - await discovery_callback(create_connection(0)) + assert result["step_id"] == "configure_receiver" - with patch("pyeiscp.Connection.discover", new=mock_discover): - form_result = await hass.config_entries.flow.async_configure( - init_result["flow_id"], - {"next_step_id": "eiscp_discovery"}, - ) + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + user_input={ + "volume_resolution": 200, + "input_sources": ["TV"], + "listening_modes": ["THX"], + }, + ) - select_result = await hass.config_entries.flow.async_configure( - form_result["flow_id"], - user_input={"device": "id42"}, - ) - - assert select_result["step_id"] == "configure_receiver" - assert select_result["description_placeholders"]["name"] == "type 42 (host 42)" + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"]["host"] == RECEIVER_INFO_2.host + assert result["result"].unique_id == RECEIVER_INFO_2.identifier -async def test_ssdp_discovery_success( - hass: HomeAssistant, default_mock_discovery -) -> None: +@pytest.mark.usefixtures("mock_setup_entry") +async def test_ssdp_discovery_success(hass: HomeAssistant) -> None: """Test SSDP discovery with valid host.""" discovery_info = SsdpServiceInfo( - ssdp_location="http://192.168.1.100:8080", + ssdp_location="http://192.168.0.101:8080", upnp={ATTR_UPNP_FRIENDLY_NAME: "Onkyo Receiver"}, ssdp_usn="uuid:mock_usn", ssdp_udn="uuid:00000000-0000-0000-0000-000000000000", @@ -224,7 +211,7 @@ async def test_ssdp_discovery_success( assert result["type"] is FlowResultType.FORM assert result["step_id"] == "configure_receiver" - select_result = await hass.config_entries.flow.async_configure( + result = await hass.config_entries.flow.async_configure( result["flow_id"], user_input={ "volume_resolution": 200, @@ -233,24 +220,19 @@ async def test_ssdp_discovery_success( }, ) - assert select_result["type"] is FlowResultType.CREATE_ENTRY - assert select_result["data"]["host"] == "192.168.1.100" - assert select_result["result"].unique_id == "id1" + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"]["host"] == RECEIVER_INFO.host + assert result["result"].unique_id == RECEIVER_INFO.identifier async def test_ssdp_discovery_already_configured( - hass: HomeAssistant, default_mock_discovery + hass: HomeAssistant, mock_config_entry: MockConfigEntry ) -> None: """Test SSDP discovery with already configured device.""" - config_entry = MockConfigEntry( - domain=DOMAIN, - data={CONF_HOST: "192.168.1.100"}, - unique_id="id1", - ) - config_entry.add_to_hass(hass) + mock_config_entry.add_to_hass(hass) discovery_info = SsdpServiceInfo( - ssdp_location="http://192.168.1.100:8080", + ssdp_location="http://192.168.0.101:8080", upnp={ATTR_UPNP_FRIENDLY_NAME: "Onkyo Receiver"}, ssdp_usn="uuid:mock_usn", ssdp_udn="uuid:00000000-0000-0000-0000-000000000000", @@ -276,10 +258,7 @@ async def test_ssdp_discovery_host_info_error(hass: HomeAssistant) -> None: ssdp_st="mock_st", ) - with patch( - "homeassistant.components.onkyo.receiver.pyeiscp.Connection.discover", - side_effect=OSError, - ): + with mock_discovery(None): result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_SSDP}, @@ -290,9 +269,7 @@ async def test_ssdp_discovery_host_info_error(hass: HomeAssistant) -> None: assert result["reason"] == "unknown" -async def test_ssdp_discovery_host_none_info( - hass: HomeAssistant, stub_mock_discovery -) -> None: +async def test_ssdp_discovery_host_none_info(hass: HomeAssistant) -> None: """Test SSDP discovery with host info error.""" discovery_info = SsdpServiceInfo( ssdp_location="http://192.168.1.100:8080", @@ -301,19 +278,18 @@ async def test_ssdp_discovery_host_none_info( ssdp_st="mock_st", ) - result = await hass.config_entries.flow.async_init( - DOMAIN, - context={"source": config_entries.SOURCE_SSDP}, - data=discovery_info, - ) + with mock_discovery([]): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_SSDP}, + data=discovery_info, + ) assert result["type"] is FlowResultType.ABORT assert result["reason"] == "cannot_connect" -async def test_ssdp_discovery_no_location( - hass: HomeAssistant, default_mock_discovery -) -> None: +async def test_ssdp_discovery_no_location(hass: HomeAssistant) -> None: """Test SSDP discovery with no location.""" discovery_info = SsdpServiceInfo( ssdp_location=None, @@ -332,9 +308,7 @@ async def test_ssdp_discovery_no_location( assert result["reason"] == "unknown" -async def test_ssdp_discovery_no_host( - hass: HomeAssistant, default_mock_discovery -) -> None: +async def test_ssdp_discovery_no_host(hass: HomeAssistant) -> None: """Test SSDP discovery with no host.""" discovery_info = SsdpServiceInfo( ssdp_location="http://", @@ -353,9 +327,7 @@ async def test_ssdp_discovery_no_host( assert result["reason"] == "unknown" -async def test_configure_no_resolution( - hass: HomeAssistant, default_mock_discovery -) -> None: +async def test_configure_no_resolution(hass: HomeAssistant) -> None: """Test receiver configure with no resolution set.""" init_result = await hass.config_entries.flow.async_init( @@ -380,9 +352,9 @@ async def test_configure_no_resolution( ) -async def test_configure(hass: HomeAssistant, default_mock_discovery) -> None: +@pytest.mark.usefixtures("mock_setup_entry") +async def test_configure(hass: HomeAssistant) -> None: """Test receiver configure.""" - result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": SOURCE_USER}, @@ -395,7 +367,7 @@ async def test_configure(hass: HomeAssistant, default_mock_discovery) -> None: result = await hass.config_entries.flow.async_configure( result["flow_id"], - user_input={CONF_HOST: "sample-host-name"}, + user_input={CONF_HOST: RECEIVER_INFO.host}, ) result = await hass.config_entries.flow.async_configure( @@ -437,9 +409,7 @@ async def test_configure(hass: HomeAssistant, default_mock_discovery) -> None: } -async def test_configure_invalid_resolution_set( - hass: HomeAssistant, default_mock_discovery -) -> None: +async def test_configure_invalid_resolution_set(hass: HomeAssistant) -> None: """Test receiver configure with invalid resolution.""" init_result = await hass.config_entries.flow.async_init( @@ -464,22 +434,23 @@ async def test_configure_invalid_resolution_set( ) -async def test_reconfigure(hass: HomeAssistant, default_mock_discovery) -> None: +@pytest.mark.usefixtures("mock_setup_entry") +async def test_reconfigure( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: """Test the reconfigure config flow.""" - receiver_info = create_receiver_info(1) - config_entry = create_config_entry_from_info(receiver_info) - await setup_integration(hass, config_entry, receiver_info) + await setup_integration(hass, mock_config_entry) - old_host = config_entry.data[CONF_HOST] - old_options = config_entry.options + old_host = mock_config_entry.data[CONF_HOST] + old_options = mock_config_entry.options - result = await config_entry.start_reconfigure_flow(hass) + result = await mock_config_entry.start_reconfigure_flow(hass) assert result["type"] is FlowResultType.FORM assert result["step_id"] == "manual" result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], user_input={"host": receiver_info.host} + result["flow_id"], user_input={"host": mock_config_entry.data[CONF_HOST]} ) await hass.async_block_till_done() @@ -494,36 +465,28 @@ async def test_reconfigure(hass: HomeAssistant, default_mock_discovery) -> None: assert result3["type"] is FlowResultType.ABORT assert result3["reason"] == "reconfigure_successful" - assert config_entry.data[CONF_HOST] == old_host - assert config_entry.options[OPTION_VOLUME_RESOLUTION] == 200 + assert mock_config_entry.data[CONF_HOST] == old_host + assert mock_config_entry.options[OPTION_VOLUME_RESOLUTION] == 200 for option, option_value in old_options.items(): if option == OPTION_VOLUME_RESOLUTION: continue - assert config_entry.options[option] == option_value + assert mock_config_entry.options[option] == option_value -async def test_reconfigure_new_device(hass: HomeAssistant) -> None: +@pytest.mark.usefixtures("mock_setup_entry") +async def test_reconfigure_new_device( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: """Test the reconfigure config flow with new device.""" - receiver_info = create_receiver_info(1) - config_entry = create_config_entry_from_info(receiver_info) - await setup_integration(hass, config_entry, receiver_info) + await setup_integration(hass, mock_config_entry) - old_unique_id = receiver_info.identifier + old_unique_id = mock_config_entry.unique_id - result = await config_entry.start_reconfigure_flow(hass) + result = await mock_config_entry.start_reconfigure_flow(hass) - mock_connection = create_connection(2) - - # Create mock discover that calls callback immediately - async def mock_discover(host, discovery_callback, timeout): - await discovery_callback(mock_connection) - - with patch( - "homeassistant.components.onkyo.receiver.pyeiscp.Connection.discover", - new=mock_discover, - ): + with mock_discovery([RECEIVER_INFO_2]): result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], user_input={"host": mock_connection.host} + result["flow_id"], user_input={CONF_HOST: RECEIVER_INFO_2.host} ) await hass.async_block_till_done() @@ -531,9 +494,10 @@ async def test_reconfigure_new_device(hass: HomeAssistant) -> None: assert result2["reason"] == "unique_id_mismatch" # unique id should remain unchanged - assert config_entry.unique_id == old_unique_id + assert mock_config_entry.unique_id == old_unique_id +@pytest.mark.usefixtures("mock_setup_entry") @pytest.mark.parametrize( "ignore_missing_translations", [ @@ -545,16 +509,15 @@ async def test_reconfigure_new_device(hass: HomeAssistant) -> None: ] ], ) -async def test_options_flow(hass: HomeAssistant, config_entry: MockConfigEntry) -> None: +async def test_options_flow( + hass: HomeAssistant, mock_config_entry: MockConfigEntry +) -> None: """Test options flow.""" + await setup_integration(hass, mock_config_entry) - receiver_info = create_receiver_info(1) - config_entry = create_empty_config_entry() - await setup_integration(hass, config_entry, receiver_info) + old_volume_resolution = mock_config_entry.options[OPTION_VOLUME_RESOLUTION] - old_volume_resolution = config_entry.options[OPTION_VOLUME_RESOLUTION] - - result = await hass.config_entries.options.async_init(config_entry.entry_id) + result = await hass.config_entries.options.async_init(mock_config_entry.entry_id) result = await hass.config_entries.options.async_configure( result["flow_id"], diff --git a/tests/components/onkyo/test_init.py b/tests/components/onkyo/test_init.py index 4c6ddcca214..144947dcbe1 100644 --- a/tests/components/onkyo/test_init.py +++ b/tests/components/onkyo/test_init.py @@ -2,51 +2,85 @@ from __future__ import annotations -from unittest.mock import patch +import asyncio +from unittest.mock import AsyncMock +from aioonkyo import Status import pytest -from homeassistant.components.onkyo import async_setup_entry from homeassistant.config_entries import ConfigEntryState from homeassistant.core import HomeAssistant -from homeassistant.exceptions import ConfigEntryNotReady -from . import create_empty_config_entry, create_receiver_info, setup_integration +from . import mock_discovery, setup_integration from tests.common import MockConfigEntry +@pytest.mark.usefixtures("mock_receiver") async def test_load_unload_entry( hass: HomeAssistant, - config_entry: MockConfigEntry, + mock_config_entry: MockConfigEntry, ) -> None: """Test load and unload entry.""" + await setup_integration(hass, mock_config_entry) - config_entry = create_empty_config_entry() - receiver_info = create_receiver_info(1) - await setup_integration(hass, config_entry, receiver_info) + assert mock_config_entry.state is ConfigEntryState.LOADED - assert config_entry.state is ConfigEntryState.LOADED - - await hass.config_entries.async_unload(config_entry.entry_id) + await hass.config_entries.async_unload(mock_config_entry.entry_id) await hass.async_block_till_done() - assert config_entry.state is ConfigEntryState.NOT_LOADED + + assert mock_config_entry.state is ConfigEntryState.NOT_LOADED -async def test_no_connection( +@pytest.mark.parametrize( + "receiver_infos", + [ + None, + [], + ], +) +async def test_initialization_failure( hass: HomeAssistant, - config_entry: MockConfigEntry, + mock_config_entry: MockConfigEntry, + receiver_infos, ) -> None: - """Test update options.""" + """Test initialization failure.""" + with mock_discovery(receiver_infos): + await setup_integration(hass, mock_config_entry) - config_entry = create_empty_config_entry() - config_entry.add_to_hass(hass) + assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY - with ( - patch( - "homeassistant.components.onkyo.async_interview", - return_value=None, - ), - pytest.raises(ConfigEntryNotReady), - ): - await async_setup_entry(hass, config_entry) + +async def test_connection_failure( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_connect: AsyncMock, +) -> None: + """Test connection failure.""" + mock_connect.side_effect = OSError + + await setup_integration(hass, mock_config_entry) + + assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY + + +@pytest.mark.usefixtures("mock_receiver") +async def test_reconnect( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_connect: AsyncMock, + read_queue: asyncio.Queue[Status | None], +) -> None: + """Test reconnect.""" + await setup_integration(hass, mock_config_entry) + + assert mock_config_entry.state is ConfigEntryState.LOADED + + mock_connect.reset_mock() + + assert mock_connect.call_count == 0 + + read_queue.put_nowait(None) # Simulate a disconnect + await asyncio.sleep(0) + + assert mock_connect.call_count == 1 diff --git a/tests/components/onkyo/test_media_player.py b/tests/components/onkyo/test_media_player.py new file mode 100644 index 00000000000..3d22e3b1af8 --- /dev/null +++ b/tests/components/onkyo/test_media_player.py @@ -0,0 +1,230 @@ +"""Test Onkyo media player platform.""" + +from collections.abc import AsyncGenerator +from unittest.mock import AsyncMock, patch + +from aioonkyo import Instruction, Zone, command +import pytest +from syrupy.assertion import SnapshotAssertion + +from homeassistant.components.media_player import ( + ATTR_INPUT_SOURCE, + ATTR_MEDIA_CONTENT_ID, + ATTR_MEDIA_CONTENT_TYPE, + ATTR_MEDIA_VOLUME_LEVEL, + ATTR_MEDIA_VOLUME_MUTED, + ATTR_SOUND_MODE, + DOMAIN as MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + SERVICE_SELECT_SOUND_MODE, + SERVICE_SELECT_SOURCE, +) +from homeassistant.components.onkyo.services import ( + ATTR_HDMI_OUTPUT, + SERVICE_SELECT_HDMI_OUTPUT, +) +from homeassistant.const import ( + ATTR_ENTITY_ID, + SERVICE_TURN_OFF, + SERVICE_TURN_ON, + SERVICE_VOLUME_DOWN, + SERVICE_VOLUME_MUTE, + SERVICE_VOLUME_SET, + SERVICE_VOLUME_UP, + Platform, +) +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ServiceValidationError +from homeassistant.helpers import entity_registry as er + +from . import setup_integration + +from tests.common import MockConfigEntry, snapshot_platform + +ENTITY_ID = "media_player.tx_nr7100" +ENTITY_ID_ZONE_2 = "media_player.tx_nr7100_zone_2" +ENTITY_ID_ZONE_3 = "media_player.tx_nr7100_zone_3" + + +@pytest.fixture(autouse=True) +async def auto_setup_integration( + hass: HomeAssistant, + mock_config_entry: MockConfigEntry, + mock_receiver: AsyncMock, + writes: list[Instruction], +) -> AsyncGenerator[None]: + """Auto setup integration.""" + with ( + patch( + "homeassistant.components.onkyo.media_player.AUDIO_VIDEO_INFORMATION_UPDATE_WAIT_TIME", + 0, + ), + patch("homeassistant.components.onkyo.PLATFORMS", [Platform.MEDIA_PLAYER]), + ): + await setup_integration(hass, mock_config_entry) + writes.clear() + yield + + +async def test_entities( + hass: HomeAssistant, + snapshot: SnapshotAssertion, + mock_config_entry: MockConfigEntry, + entity_registry: er.EntityRegistry, +) -> None: + """Test entities.""" + await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id) + + +@pytest.mark.parametrize( + ("action", "action_data", "message"), + [ + (SERVICE_TURN_ON, {}, command.Power(Zone.MAIN, command.Power.Param.ON)), + (SERVICE_TURN_OFF, {}, command.Power(Zone.MAIN, command.Power.Param.STANDBY)), + ( + SERVICE_VOLUME_SET, + {ATTR_MEDIA_VOLUME_LEVEL: 0.5}, + command.Volume(Zone.MAIN, 40), + ), + (SERVICE_VOLUME_UP, {}, command.Volume(Zone.MAIN, command.Volume.Param.UP)), + (SERVICE_VOLUME_DOWN, {}, command.Volume(Zone.MAIN, command.Volume.Param.DOWN)), + ( + SERVICE_VOLUME_MUTE, + {ATTR_MEDIA_VOLUME_MUTED: True}, + command.Muting(Zone.MAIN, command.Muting.Param.ON), + ), + ( + SERVICE_VOLUME_MUTE, + {ATTR_MEDIA_VOLUME_MUTED: False}, + command.Muting(Zone.MAIN, command.Muting.Param.OFF), + ), + ], +) +async def test_actions( + hass: HomeAssistant, + writes: list[Instruction], + action: str, + action_data: dict, + message: Instruction, +) -> None: + """Test actions.""" + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + action, + {ATTR_ENTITY_ID: ENTITY_ID, **action_data}, + blocking=True, + ) + assert writes[0] == message + + +async def test_select_source(hass: HomeAssistant, writes: list[Instruction]) -> None: + """Test select source.""" + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_SELECT_SOURCE, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_INPUT_SOURCE: "TV"}, + blocking=True, + ) + assert writes[0] == command.InputSource(Zone.MAIN, command.InputSource.Param("12")) + + writes.clear() + with pytest.raises(ServiceValidationError): + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_SELECT_SOURCE, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_INPUT_SOURCE: "InvalidSource"}, + blocking=True, + ) + assert not writes + + +async def test_select_sound_mode( + hass: HomeAssistant, writes: list[Instruction] +) -> None: + """Test select sound mode.""" + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_SELECT_SOUND_MODE, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_SOUND_MODE: "THX"}, + blocking=True, + ) + assert writes[0] == command.ListeningMode( + Zone.MAIN, command.ListeningMode.Param("04") + ) + + writes.clear() + with pytest.raises(ServiceValidationError): + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_SELECT_SOUND_MODE, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_SOUND_MODE: "InvalidMode"}, + blocking=True, + ) + assert not writes + + +async def test_play_media(hass: HomeAssistant, writes: list[Instruction]) -> None: + """Test play media (radio preset).""" + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: "radio", + ATTR_MEDIA_CONTENT_ID: "5", + }, + blocking=True, + ) + assert writes[0] == command.TunerPreset(Zone.MAIN, 5) + + writes.clear() + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID, + ATTR_MEDIA_CONTENT_TYPE: "music", + ATTR_MEDIA_CONTENT_ID: "5", + }, + blocking=True, + ) + assert not writes + + writes.clear() + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID_ZONE_2, + ATTR_MEDIA_CONTENT_TYPE: "radio", + ATTR_MEDIA_CONTENT_ID: "5", + }, + blocking=True, + ) + assert not writes + + writes.clear() + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_PLAY_MEDIA, + { + ATTR_ENTITY_ID: ENTITY_ID_ZONE_3, + ATTR_MEDIA_CONTENT_TYPE: "radio", + ATTR_MEDIA_CONTENT_ID: "5", + }, + blocking=True, + ) + assert not writes + + +async def test_select_hdmi_output( + hass: HomeAssistant, writes: list[Instruction] +) -> None: + """Test select hdmi output.""" + await hass.services.async_call( + MEDIA_PLAYER_DOMAIN, + SERVICE_SELECT_HDMI_OUTPUT, + {ATTR_ENTITY_ID: ENTITY_ID, ATTR_HDMI_OUTPUT: "sub"}, + blocking=True, + ) + assert writes[0] == command.HDMIOutput(command.HDMIOutput.Param.BOTH)