Add allowed UUIDs and ignore CEC to Google Cast options flow (#47269)

This commit is contained in:
Erik Montnemery 2021-03-25 14:06:01 +01:00 committed by GitHub
parent 6b2a2740f1
commit 3188f796f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 253 additions and 186 deletions

View File

@ -1,20 +1,42 @@
"""Component to embed Google Cast.""" """Component to embed Google Cast."""
import logging
import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.helpers import config_validation as cv
from . import home_assistant_cast from . import home_assistant_cast
from .const import DOMAIN from .const import DOMAIN
from .media_player import ENTITY_SCHEMA
# Deprecated from 2021.4, remove in 2021.6
CONFIG_SCHEMA = cv.deprecated(DOMAIN)
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass, config): async def async_setup(hass, config):
"""Set up the Cast component.""" """Set up the Cast component."""
conf = config.get(DOMAIN) conf = config.get(DOMAIN)
hass.data[DOMAIN] = conf or {}
if conf is not None: if conf is not None:
media_player_config_validated = []
media_player_config = conf.get("media_player", {})
if not isinstance(media_player_config, list):
media_player_config = [media_player_config]
for cfg in media_player_config:
try:
cfg = ENTITY_SCHEMA(cfg)
media_player_config_validated.append(cfg)
except vol.Error as ex:
_LOGGER.warning("Invalid config '%s': %s", cfg, ex)
hass.async_create_task( hass.async_create_task(
hass.config_entries.flow.async_init( hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_IMPORT} DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=media_player_config_validated,
) )
) )

View File

@ -4,9 +4,11 @@ import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from .const import CONF_KNOWN_HOSTS, DOMAIN from .const import CONF_IGNORE_CEC, CONF_KNOWN_HOSTS, CONF_UUID, DOMAIN
IGNORE_CEC_SCHEMA = vol.Schema(vol.All(cv.ensure_list, [cv.string]))
KNOWN_HOSTS_SCHEMA = vol.Schema(vol.All(cv.ensure_list, [cv.string])) KNOWN_HOSTS_SCHEMA = vol.Schema(vol.All(cv.ensure_list, [cv.string]))
WANTED_UUID_SCHEMA = vol.Schema(vol.All(cv.ensure_list, [cv.string]))
class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN): class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@ -17,7 +19,9 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self): def __init__(self):
"""Initialize flow.""" """Initialize flow."""
self._known_hosts = None self._ignore_cec = set()
self._known_hosts = set()
self._wanted_uuid = set()
@staticmethod @staticmethod
def async_get_options_flow(config_entry): def async_get_options_flow(config_entry):
@ -28,7 +32,15 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Import data.""" """Import data."""
if self._async_current_entries(): if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed") return self.async_abort(reason="single_instance_allowed")
data = {CONF_KNOWN_HOSTS: self._known_hosts}
media_player_config = import_data or []
for cfg in media_player_config:
if CONF_IGNORE_CEC in cfg:
self._ignore_cec.update(set(cfg[CONF_IGNORE_CEC]))
if CONF_UUID in cfg:
self._wanted_uuid.add(cfg[CONF_UUID])
data = self._get_data()
return self.async_create_entry(title="Google Cast", data=data) return self.async_create_entry(title="Google Cast", data=data)
async def async_step_user(self, user_input=None): async def async_step_user(self, user_input=None):
@ -62,7 +74,8 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors["base"] = "invalid_known_hosts" errors["base"] = "invalid_known_hosts"
bad_hosts = True bad_hosts = True
else: else:
data[CONF_KNOWN_HOSTS] = known_hosts self._known_hosts = known_hosts
data = self._get_data()
if not bad_hosts: if not bad_hosts:
return self.async_create_entry(title="Google Cast", data=data) return self.async_create_entry(title="Google Cast", data=data)
@ -76,13 +89,20 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_confirm(self, user_input=None): async def async_step_confirm(self, user_input=None):
"""Confirm the setup.""" """Confirm the setup."""
data = {CONF_KNOWN_HOSTS: self._known_hosts} data = self._get_data()
if user_input is not None: if user_input is not None:
return self.async_create_entry(title="Google Cast", data=data) return self.async_create_entry(title="Google Cast", data=data)
return self.async_show_form(step_id="confirm") return self.async_show_form(step_id="confirm")
def _get_data(self):
return {
CONF_IGNORE_CEC: list(self._ignore_cec),
CONF_KNOWN_HOSTS: list(self._known_hosts),
CONF_UUID: list(self._wanted_uuid),
}
class CastOptionsFlowHandler(config_entries.OptionsFlow): class CastOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle Google Cast options.""" """Handle Google Cast options."""
@ -102,35 +122,59 @@ class CastOptionsFlowHandler(config_entries.OptionsFlow):
errors = {} errors = {}
current_config = self.config_entry.data current_config = self.config_entry.data
if user_input is not None: if user_input is not None:
bad_hosts = False bad_cec, ignore_cec = _string_to_list(
user_input.get(CONF_IGNORE_CEC, ""), IGNORE_CEC_SCHEMA
)
bad_hosts, known_hosts = _string_to_list(
user_input.get(CONF_KNOWN_HOSTS, ""), KNOWN_HOSTS_SCHEMA
)
bad_uuid, wanted_uuid = _string_to_list(
user_input.get(CONF_UUID, ""), WANTED_UUID_SCHEMA
)
known_hosts = user_input.get(CONF_KNOWN_HOSTS, "") if not bad_cec and not bad_hosts and not bad_uuid:
known_hosts = [x.strip() for x in known_hosts.split(",") if x.strip()]
try:
known_hosts = KNOWN_HOSTS_SCHEMA(known_hosts)
except vol.Invalid:
errors["base"] = "invalid_known_hosts"
bad_hosts = True
if not bad_hosts:
updated_config = {} updated_config = {}
updated_config[CONF_IGNORE_CEC] = ignore_cec
updated_config[CONF_KNOWN_HOSTS] = known_hosts updated_config[CONF_KNOWN_HOSTS] = known_hosts
updated_config[CONF_UUID] = wanted_uuid
self.hass.config_entries.async_update_entry( self.hass.config_entries.async_update_entry(
self.config_entry, data=updated_config self.config_entry, data=updated_config
) )
return self.async_create_entry(title="", data=None) return self.async_create_entry(title="", data=None)
fields = {} fields = {}
known_hosts_string = "" suggested_value = _list_to_string(current_config.get(CONF_KNOWN_HOSTS))
if current_config.get(CONF_KNOWN_HOSTS): _add_with_suggestion(fields, CONF_KNOWN_HOSTS, suggested_value)
known_hosts_string = ",".join(current_config.get(CONF_KNOWN_HOSTS)) if self.show_advanced_options:
fields[ suggested_value = _list_to_string(current_config.get(CONF_UUID))
vol.Optional( _add_with_suggestion(fields, CONF_UUID, suggested_value)
"known_hosts", description={"suggested_value": known_hosts_string} suggested_value = _list_to_string(current_config.get(CONF_IGNORE_CEC))
) _add_with_suggestion(fields, CONF_IGNORE_CEC, suggested_value)
] = str
return self.async_show_form( return self.async_show_form(
step_id="options", step_id="options",
data_schema=vol.Schema(fields), data_schema=vol.Schema(fields),
errors=errors, errors=errors,
) )
def _list_to_string(items):
comma_separated_string = ""
if items:
comma_separated_string = ",".join(items)
return comma_separated_string
def _string_to_list(string, schema):
invalid = False
items = [x.strip() for x in string.split(",") if x.strip()]
try:
items = schema(items)
except vol.Invalid:
invalid = True
return invalid, items
def _add_with_suggestion(fields, key, suggested_value):
fields[vol.Optional(key, description={"suggested_value": suggested_value})] = str

View File

@ -5,9 +5,6 @@ DEFAULT_PORT = 8009
# Stores a threading.Lock that is held by the internal pychromecast discovery. # Stores a threading.Lock that is held by the internal pychromecast discovery.
INTERNAL_DISCOVERY_RUNNING_KEY = "cast_discovery_running" INTERNAL_DISCOVERY_RUNNING_KEY = "cast_discovery_running"
# Stores all ChromecastInfo we encountered through discovery or config as a set
# If we find a chromecast with a new host, the old one will be removed again.
KNOWN_CHROMECAST_INFO_KEY = "cast_known_chromecasts"
# Stores UUIDs of cast devices that were added as entities. Doesn't store # Stores UUIDs of cast devices that were added as entities. Doesn't store
# None UUIDs. # None UUIDs.
ADDED_CAST_DEVICES_KEY = "cast_added_cast_devices" ADDED_CAST_DEVICES_KEY = "cast_added_cast_devices"
@ -27,4 +24,6 @@ SIGNAL_CAST_REMOVED = "cast_removed"
# Dispatcher signal fired when a Chromecast should show a Home Assistant Cast view. # Dispatcher signal fired when a Chromecast should show a Home Assistant Cast view.
SIGNAL_HASS_CAST_SHOW_VIEW = "cast_show_view" SIGNAL_HASS_CAST_SHOW_VIEW = "cast_show_view"
CONF_IGNORE_CEC = "ignore_cec"
CONF_KNOWN_HOSTS = "known_hosts" CONF_KNOWN_HOSTS = "known_hosts"
CONF_UUID = "uuid"

View File

@ -13,7 +13,6 @@ from .const import (
CONF_KNOWN_HOSTS, CONF_KNOWN_HOSTS,
DEFAULT_PORT, DEFAULT_PORT,
INTERNAL_DISCOVERY_RUNNING_KEY, INTERNAL_DISCOVERY_RUNNING_KEY,
KNOWN_CHROMECAST_INFO_KEY,
SIGNAL_CAST_DISCOVERED, SIGNAL_CAST_DISCOVERED,
SIGNAL_CAST_REMOVED, SIGNAL_CAST_REMOVED,
) )
@ -38,12 +37,8 @@ def discover_chromecast(hass: HomeAssistant, device_info):
return return
info = info.fill_out_missing_chromecast_info() info = info.fill_out_missing_chromecast_info()
if info.uuid in hass.data[KNOWN_CHROMECAST_INFO_KEY]: _LOGGER.debug("Discovered new or updated chromecast %s", info)
_LOGGER.debug("Discovered update for known chromecast %s", info)
else:
_LOGGER.debug("Discovered chromecast %s", info)
hass.data[KNOWN_CHROMECAST_INFO_KEY][info.uuid] = info
dispatcher_send(hass, SIGNAL_CAST_DISCOVERED, info) dispatcher_send(hass, SIGNAL_CAST_DISCOVERED, info)

View File

@ -1,7 +1,6 @@
"""Provide functionality to interact with Cast devices on the network.""" """Provide functionality to interact with Cast devices on the network."""
from __future__ import annotations from __future__ import annotations
import asyncio
from contextlib import suppress from contextlib import suppress
from datetime import timedelta from datetime import timedelta
import functools as ft import functools as ft
@ -52,19 +51,19 @@ from homeassistant.const import (
STATE_PLAYING, STATE_PLAYING,
) )
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.exceptions import PlatformNotReady
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.network import NoURLAvailableError, get_url from homeassistant.helpers.network import NoURLAvailableError, get_url
from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.logging import async_create_catching_coro from homeassistant.util.logging import async_create_catching_coro
from .const import ( from .const import (
ADDED_CAST_DEVICES_KEY, ADDED_CAST_DEVICES_KEY,
CAST_MULTIZONE_MANAGER_KEY, CAST_MULTIZONE_MANAGER_KEY,
CONF_IGNORE_CEC,
CONF_UUID,
DOMAIN as CAST_DOMAIN, DOMAIN as CAST_DOMAIN,
KNOWN_CHROMECAST_INFO_KEY,
SIGNAL_CAST_DISCOVERED, SIGNAL_CAST_DISCOVERED,
SIGNAL_CAST_REMOVED, SIGNAL_CAST_REMOVED,
SIGNAL_HASS_CAST_SHOW_VIEW, SIGNAL_HASS_CAST_SHOW_VIEW,
@ -74,8 +73,6 @@ from .helpers import CastStatusListener, ChromecastInfo, ChromeCastZeroconf
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
CONF_IGNORE_CEC = "ignore_cec"
CONF_UUID = "uuid"
CAST_SPLASH = "https://www.home-assistant.io/images/cast/splash.png" CAST_SPLASH = "https://www.home-assistant.io/images/cast/splash.png"
SUPPORT_CAST = ( SUPPORT_CAST = (
@ -129,45 +126,20 @@ def _async_create_cast_device(hass: HomeAssistantType, info: ChromecastInfo):
async def async_setup_entry(hass, config_entry, async_add_entities): async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up Cast from a config entry.""" """Set up Cast from a config entry."""
config = hass.data[CAST_DOMAIN].get("media_player") or {}
if not isinstance(config, list):
config = [config]
# no pending task
done, _ = await asyncio.wait(
[
_async_setup_platform(
hass, ENTITY_SCHEMA(cfg), async_add_entities, config_entry
)
for cfg in config
]
)
if any(task.exception() for task in done):
exceptions = [task.exception() for task in done]
for exception in exceptions:
_LOGGER.debug("Failed to setup chromecast", exc_info=exception)
raise PlatformNotReady
async def _async_setup_platform(
hass: HomeAssistantType, config: ConfigType, async_add_entities, config_entry
):
"""Set up the cast platform."""
# Import CEC IGNORE attributes
pychromecast.IGNORE_CEC += config.get(CONF_IGNORE_CEC, [])
hass.data.setdefault(ADDED_CAST_DEVICES_KEY, set()) hass.data.setdefault(ADDED_CAST_DEVICES_KEY, set())
hass.data.setdefault(KNOWN_CHROMECAST_INFO_KEY, {})
wanted_uuid = None # Import CEC IGNORE attributes
if CONF_UUID in config: pychromecast.IGNORE_CEC += config_entry.data.get(CONF_IGNORE_CEC) or []
wanted_uuid = config[CONF_UUID]
wanted_uuids = config_entry.data.get(CONF_UUID) or None
@callback @callback
def async_cast_discovered(discover: ChromecastInfo) -> None: def async_cast_discovered(discover: ChromecastInfo) -> None:
"""Handle discovery of a new chromecast.""" """Handle discovery of a new chromecast."""
# If wanted_uuid is set, we're handling a specific cast device identified by UUID # If wanted_uuids is set, we're only accepting specific cast devices identified
if wanted_uuid is not None and wanted_uuid != discover.uuid: # by UUID
# UUID not matching, this is not it. if wanted_uuids is not None and discover.uuid not in wanted_uuids:
# UUID not matching, ignore.
return return
cast_device = _async_create_cast_device(hass, discover) cast_device = _async_create_cast_device(hass, discover)
@ -175,11 +147,6 @@ async def _async_setup_platform(
async_add_entities([cast_device]) async_add_entities([cast_device])
async_dispatcher_connect(hass, SIGNAL_CAST_DISCOVERED, async_cast_discovered) async_dispatcher_connect(hass, SIGNAL_CAST_DISCOVERED, async_cast_discovered)
# Re-play the callback for all past chromecasts, store the objects in
# a list to avoid concurrent modification resulting in exception.
for chromecast in hass.data[KNOWN_CHROMECAST_INFO_KEY].values():
async_cast_discovered(chromecast)
ChromeCastZeroconf.set_zeroconf(await zeroconf.async_get_instance(hass)) ChromeCastZeroconf.set_zeroconf(await zeroconf.async_get_instance(hass))
hass.async_add_executor_job(setup_internal_discovery, hass, config_entry) hass.async_add_executor_job(setup_internal_discovery, hass, config_entry)

View File

@ -24,7 +24,9 @@
"options": { "options": {
"description": "Please enter the Google Cast configuration.", "description": "Please enter the Google Cast configuration.",
"data": { "data": {
"known_hosts": "Optional list of known hosts if mDNS discovery is not working." "ignore_cec": "Optional list which will be passed to pychromecast.IGNORE_CEC.",
"known_hosts": "Optional list of known hosts if mDNS discovery is not working.",
"uuid": "Optional list of UUIDs. Casts not listed will not be added."
} }
} }
}, },

View File

@ -27,7 +27,9 @@
"step": { "step": {
"options": { "options": {
"data": { "data": {
"known_hosts": "Optional list of known hosts if mDNS discovery is not working." "ignore_cec": "Optional list which will be passed to pychromecast.IGNORE_CEC.",
"known_hosts": "Optional list of known hosts if mDNS discovery is not working.",
"uuid": "Optional list of UUIDs. Casts not listed will not be added."
}, },
"description": "Please enter the Google Cast configuration." "description": "Please enter the Google Cast configuration."
} }

View File

@ -40,6 +40,7 @@ def mz_mock():
def pycast_mock(castbrowser_mock, castbrowser_constructor_mock): def pycast_mock(castbrowser_mock, castbrowser_constructor_mock):
"""Mock pychromecast.""" """Mock pychromecast."""
pycast_mock = MagicMock() pycast_mock = MagicMock()
pycast_mock.IGNORE_CEC = []
pycast_mock.discovery.CastBrowser = castbrowser_constructor_mock pycast_mock.discovery.CastBrowser = castbrowser_constructor_mock
pycast_mock.discovery.CastBrowser.return_value = castbrowser_mock pycast_mock.discovery.CastBrowser.return_value = castbrowser_mock
pycast_mock.discovery.AbstractCastListener = ( pycast_mock.discovery.AbstractCastListener = (

View File

@ -102,12 +102,8 @@ async def test_remove_entry(hass, mock_zeroconf):
entry.add_to_hass(hass) entry.add_to_hass(hass)
with patch( with patch(
"homeassistant.components.cast.media_player._async_setup_platform"
), patch(
"pychromecast.discovery.discover_chromecasts", return_value=(True, None) "pychromecast.discovery.discover_chromecasts", return_value=(True, None)
), patch( ), patch("pychromecast.discovery.stop_discovery"):
"pychromecast.discovery.stop_discovery"
):
assert await hass.config_entries.async_setup(entry.entry_id) assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done() await hass.async_block_till_done()
assert "cast" in hass.config.components assert "cast" in hass.config.components

View File

@ -35,18 +35,36 @@ async def test_creating_entry_sets_up_media_player(hass):
assert len(mock_setup.mock_calls) == 1 assert len(mock_setup.mock_calls) == 1
async def test_configuring_cast_creates_entry(hass): async def test_import(hass, caplog):
"""Test that specifying config will create an entry.""" """Test that specifying config will create an entry."""
with patch( with patch(
"homeassistant.components.cast.async_setup_entry", return_value=True "homeassistant.components.cast.async_setup_entry", return_value=True
) as mock_setup: ) as mock_setup:
await async_setup_component( await async_setup_component(
hass, cast.DOMAIN, {"cast": {"some_config": "to_trigger_import"}} hass,
cast.DOMAIN,
{
"cast": {
"media_player": [
{"uuid": "abcd"},
{"uuid": "abcd", "ignore_cec": "milk"},
{"uuid": "efgh", "ignore_cec": "beer"},
{"incorrect": "config"},
]
}
},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(mock_setup.mock_calls) == 1 assert len(mock_setup.mock_calls) == 1
assert len(hass.config_entries.async_entries("cast")) == 1
entry = hass.config_entries.async_entries("cast")[0]
assert set(entry.data["ignore_cec"]) == {"milk", "beer"}
assert set(entry.data["uuid"]) == {"abcd", "efgh"}
assert "Invalid config '{'incorrect': 'config'}'" in caplog.text
async def test_not_configuring_cast_not_creates_entry(hass): async def test_not_configuring_cast_not_creates_entry(hass):
"""Test that no config will not create an entry.""" """Test that no config will not create an entry."""
@ -72,7 +90,7 @@ async def test_single_instance(hass, source):
assert result["reason"] == "single_instance_allowed" assert result["reason"] == "single_instance_allowed"
async def test_user_setup(hass, mqtt_mock): async def test_user_setup(hass):
"""Test we can finish a config flow.""" """Test we can finish a config flow."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
"cast", context={"source": "user"} "cast", context={"source": "user"}
@ -85,12 +103,14 @@ async def test_user_setup(hass, mqtt_mock):
assert len(users) == 1 assert len(users) == 1
assert result["type"] == "create_entry" assert result["type"] == "create_entry"
assert result["result"].data == { assert result["result"].data == {
"ignore_cec": [],
"known_hosts": [], "known_hosts": [],
"uuid": [],
"user_id": users[0].id, # Home Assistant cast user "user_id": users[0].id, # Home Assistant cast user
} }
async def test_user_setup_options(hass, mqtt_mock): async def test_user_setup_options(hass):
"""Test we can finish a config flow.""" """Test we can finish a config flow."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
"cast", context={"source": "user"} "cast", context={"source": "user"}
@ -105,7 +125,9 @@ async def test_user_setup_options(hass, mqtt_mock):
assert len(users) == 1 assert len(users) == 1
assert result["type"] == "create_entry" assert result["type"] == "create_entry"
assert result["result"].data == { assert result["result"].data == {
"ignore_cec": [],
"known_hosts": ["192.168.0.1", "192.168.0.2"], "known_hosts": ["192.168.0.1", "192.168.0.2"],
"uuid": [],
"user_id": users[0].id, # Home Assistant cast user "user_id": users[0].id, # Home Assistant cast user
} }
@ -123,7 +145,9 @@ async def test_zeroconf_setup(hass):
assert len(users) == 1 assert len(users) == 1
assert result["type"] == "create_entry" assert result["type"] == "create_entry"
assert result["result"].data == { assert result["result"].data == {
"known_hosts": None, "ignore_cec": [],
"known_hosts": [],
"uuid": [],
"user_id": users[0].id, # Home Assistant cast user "user_id": users[0].id, # Home Assistant cast user
} }
@ -137,27 +161,90 @@ def get_suggested(schema, key):
return k.description["suggested_value"] return k.description["suggested_value"]
async def test_option_flow(hass): @pytest.mark.parametrize(
"""Test config flow options.""" "parameter_data",
config_entry = MockConfigEntry( [
domain="cast", data={"known_hosts": ["192.168.0.10", "192.168.0.11"]} (
"known_hosts",
["192.168.0.10", "192.168.0.11"],
"192.168.0.10,192.168.0.11",
"192.168.0.1, , 192.168.0.2 ",
["192.168.0.1", "192.168.0.2"],
),
(
"uuid",
["bla", "blu"],
"bla,blu",
"foo, , bar ",
["foo", "bar"],
),
(
"ignore_cec",
["cast1", "cast2"],
"cast1,cast2",
"other_cast, , some_cast ",
["other_cast", "some_cast"],
),
],
) )
async def test_option_flow(hass, parameter_data):
"""Test config flow options."""
all_parameters = ["ignore_cec", "known_hosts", "uuid"]
parameter, initial, suggested, user_input, updated = parameter_data
data = {
"ignore_cec": [],
"known_hosts": [],
"uuid": [],
}
data[parameter] = initial
config_entry = MockConfigEntry(domain="cast", data=data)
config_entry.add_to_hass(hass) config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done() await hass.async_block_till_done()
# Test ignore_cec and uuid options are hidden if advanced options are disabled
result = await hass.config_entries.options.async_init(config_entry.entry_id) result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "options" assert result["step_id"] == "options"
data_schema = result["data_schema"].schema data_schema = result["data_schema"].schema
assert get_suggested(data_schema, "known_hosts") == "192.168.0.10,192.168.0.11" assert set(data_schema) == {"known_hosts"}
# Reconfigure ignore_cec, known_hosts, uuid
context = {"source": "user", "show_advanced_options": True}
result = await hass.config_entries.options.async_init(
config_entry.entry_id, context=context
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "options"
data_schema = result["data_schema"].schema
for other_param in all_parameters:
if other_param == parameter:
continue
assert get_suggested(data_schema, other_param) == ""
assert get_suggested(data_schema, parameter) == suggested
result = await hass.config_entries.options.async_configure( result = await hass.config_entries.options.async_configure(
result["flow_id"], result["flow_id"],
user_input={"known_hosts": "192.168.0.1, , 192.168.0.2 "}, user_input={parameter: user_input},
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] is None assert result["data"] is None
assert config_entry.data == {"known_hosts": ["192.168.0.1", "192.168.0.2"]} for other_param in all_parameters:
if other_param == parameter:
continue
assert config_entry.data[other_param] == []
assert config_entry.data[parameter] == updated
# Clear known_hosts
result = await hass.config_entries.options.async_init(config_entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={"known_hosts": ""},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] is None
assert config_entry.data == {"ignore_cec": [], "known_hosts": [], "uuid": []}
async def test_known_hosts(hass, castbrowser_mock, castbrowser_constructor_mock): async def test_known_hosts(hass, castbrowser_mock, castbrowser_constructor_mock):

View File

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from unittest.mock import ANY, MagicMock, Mock, patch from unittest.mock import ANY, MagicMock, patch
from uuid import UUID from uuid import UUID
import attr import attr
@ -28,7 +28,6 @@ from homeassistant.components.media_player.const import (
) )
from homeassistant.config import async_process_ha_core_config from homeassistant.config import async_process_ha_core_config
from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import entity_registry as er from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.typing import HomeAssistantType from homeassistant.helpers.typing import HomeAssistantType
@ -100,11 +99,13 @@ async def async_setup_cast(hass, config=None):
"""Set up the cast platform.""" """Set up the cast platform."""
if config is None: if config is None:
config = {} config = {}
data = {**{"ignore_cec": [], "known_hosts": [], "uuid": []}, **config}
with patch( with patch(
"homeassistant.helpers.entity_platform.EntityPlatform._async_schedule_add_entities" "homeassistant.helpers.entity_platform.EntityPlatform._async_schedule_add_entities"
) as add_entities: ) as add_entities:
MockConfigEntry(domain="cast").add_to_hass(hass) entry = MockConfigEntry(data=data, domain="cast")
await async_setup_component(hass, "cast", {"cast": {"media_player": config}}) entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done() await hass.async_block_till_done()
return add_entities return add_entities
@ -388,44 +389,6 @@ async def test_create_cast_device_with_uuid(hass):
assert cast_device is None assert cast_device is None
async def test_replay_past_chromecasts(hass):
"""Test cast platform re-playing past chromecasts when adding new one."""
cast_group1 = get_fake_chromecast_info(host="host1", port=8009, uuid=FakeUUID)
cast_group2 = get_fake_chromecast_info(
host="host2", port=8009, uuid=UUID("9462202c-e747-4af5-a66b-7dce0e1ebc09")
)
zconf_1 = get_fake_zconf(host="host1", port=8009)
zconf_2 = get_fake_zconf(host="host2", port=8009)
discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery(
hass, config={"uuid": FakeUUID}
)
with patch(
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
return_value=zconf_2,
):
discover_cast("service2", cast_group2)
await hass.async_block_till_done()
await hass.async_block_till_done() # having tasks that add jobs
assert add_dev1.call_count == 0
with patch(
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
return_value=zconf_1,
):
discover_cast("service1", cast_group1)
await hass.async_block_till_done()
await hass.async_block_till_done() # having tasks that add jobs
assert add_dev1.call_count == 1
add_dev2 = Mock()
entry = hass.config_entries.async_entries("cast")[0]
await cast._async_setup_platform(hass, {"host": "host2"}, add_dev2, entry)
await hass.async_block_till_done()
assert add_dev2.call_count == 1
async def test_manual_cast_chromecasts_uuid(hass): async def test_manual_cast_chromecasts_uuid(hass):
"""Test only wanted casts are added for manual configuration.""" """Test only wanted casts are added for manual configuration."""
cast_1 = get_fake_chromecast_info(host="host_1", uuid=FakeUUID) cast_1 = get_fake_chromecast_info(host="host_1", uuid=FakeUUID)
@ -435,7 +398,7 @@ async def test_manual_cast_chromecasts_uuid(hass):
# Manual configuration of media player with host "configured_host" # Manual configuration of media player with host "configured_host"
discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery( discover_cast, _, add_dev1 = await async_setup_cast_internal_discovery(
hass, config={"uuid": FakeUUID} hass, config={"uuid": str(FakeUUID)}
) )
with patch( with patch(
"homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf", "homeassistant.components.cast.discovery.ChromeCastZeroconf.get_zeroconf",
@ -1291,65 +1254,54 @@ async def test_disconnect_on_stop(hass: HomeAssistantType):
async def test_entry_setup_no_config(hass: HomeAssistantType): async def test_entry_setup_no_config(hass: HomeAssistantType):
"""Test setting up entry with no config..""" """Test deprecated empty yaml config.."""
await async_setup_component(hass, "cast", {}) await async_setup_component(hass, "cast", {})
await hass.async_block_till_done() await hass.async_block_till_done()
with patch( assert not hass.config_entries.async_entries("cast")
"homeassistant.components.cast.media_player._async_setup_platform",
) as mock_setup:
await cast.async_setup_entry(hass, MockConfigEntry(), None)
assert len(mock_setup.mock_calls) == 1
assert mock_setup.mock_calls[0][1][1] == {}
async def test_entry_setup_single_config(hass: HomeAssistantType): async def test_entry_setup_empty_config(hass: HomeAssistantType):
"""Test setting up entry and having a single config option.""" """Test deprecated empty yaml config.."""
await async_setup_component(hass, "cast", {"cast": {}})
await hass.async_block_till_done()
config_entry = hass.config_entries.async_entries("cast")[0]
assert config_entry.data["uuid"] == []
assert config_entry.data["ignore_cec"] == []
async def test_entry_setup_single_config(hass: HomeAssistantType, pycast_mock):
"""Test deprecated yaml config with a single config media_player."""
await async_setup_component( await async_setup_component(
hass, "cast", {"cast": {"media_player": {"uuid": "bla"}}} hass, "cast", {"cast": {"media_player": {"uuid": "bla", "ignore_cec": "cast1"}}}
) )
await hass.async_block_till_done() await hass.async_block_till_done()
with patch( config_entry = hass.config_entries.async_entries("cast")[0]
"homeassistant.components.cast.media_player._async_setup_platform", assert config_entry.data["uuid"] == ["bla"]
) as mock_setup: assert config_entry.data["ignore_cec"] == ["cast1"]
await cast.async_setup_entry(hass, MockConfigEntry(), None)
assert len(mock_setup.mock_calls) == 1 assert pycast_mock.IGNORE_CEC == ["cast1"]
assert mock_setup.mock_calls[0][1][1] == {"uuid": "bla"}
async def test_entry_setup_list_config(hass: HomeAssistantType): async def test_entry_setup_list_config(hass: HomeAssistantType, pycast_mock):
"""Test setting up entry and having multiple config options.""" """Test deprecated yaml config with multiple media_players."""
await async_setup_component( await async_setup_component(
hass, "cast", {"cast": {"media_player": [{"uuid": "bla"}, {"uuid": "blu"}]}} hass,
"cast",
{
"cast": {
"media_player": [
{"uuid": "bla", "ignore_cec": "cast1"},
{"uuid": "blu", "ignore_cec": ["cast2", "cast3"]},
]
}
},
) )
await hass.async_block_till_done() await hass.async_block_till_done()
with patch( config_entry = hass.config_entries.async_entries("cast")[0]
"homeassistant.components.cast.media_player._async_setup_platform", assert set(config_entry.data["uuid"]) == {"bla", "blu"}
) as mock_setup: assert set(config_entry.data["ignore_cec"]) == {"cast1", "cast2", "cast3"}
await cast.async_setup_entry(hass, MockConfigEntry(), None) assert set(pycast_mock.IGNORE_CEC) == {"cast1", "cast2", "cast3"}
assert len(mock_setup.mock_calls) == 2
assert mock_setup.mock_calls[0][1][1] == {"uuid": "bla"}
assert mock_setup.mock_calls[1][1][1] == {"uuid": "blu"}
async def test_entry_setup_platform_not_ready(hass: HomeAssistantType):
"""Test failed setting up entry will raise PlatformNotReady."""
await async_setup_component(
hass, "cast", {"cast": {"media_player": {"uuid": "bla"}}}
)
await hass.async_block_till_done()
with patch(
"homeassistant.components.cast.media_player._async_setup_platform",
side_effect=Exception,
) as mock_setup:
with pytest.raises(PlatformNotReady):
await cast.async_setup_entry(hass, MockConfigEntry(), None)
assert len(mock_setup.mock_calls) == 1
assert mock_setup.mock_calls[0][1][1] == {"uuid": "bla"}