From 28afe9ff9edfba028441ebb3b6e63695f5ef5e14 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 14 Feb 2024 16:25:08 -0600 Subject: [PATCH] Correct misaligned formatting of mac addresses in samsungtv (#110599) * Correct misaligned formatting of mac addresses in samsungtv dhcp returns addresses in lowercase without : and there were places were it was not passed through format_mac which resulted in the wrong format being saved in the config entry * safer --- .../components/samsungtv/__init__.py | 9 ++- .../components/samsungtv/config_flow.py | 18 ++++- tests/components/samsungtv/const.py | 4 +- .../components/samsungtv/test_config_flow.py | 78 ++++++++++++------- tests/components/samsungtv/test_init.py | 35 ++++++++- .../components/samsungtv/test_media_player.py | 4 +- 6 files changed, 110 insertions(+), 38 deletions(-) diff --git a/homeassistant/components/samsungtv/__init__.py b/homeassistant/components/samsungtv/__init__.py index 98ec349154e..a70a336ebfd 100644 --- a/homeassistant/components/samsungtv/__init__.py +++ b/homeassistant/components/samsungtv/__init__.py @@ -198,10 +198,13 @@ async def _async_create_bridge_with_updated_data( mac: str | None = entry.data.get(CONF_MAC) model: str | None = entry.data.get(CONF_MODEL) - if (not mac or not model) and not load_info_attempted: + mac_is_incorrectly_formatted = mac and dr.format_mac(mac) != mac + if ( + not mac or not model or mac_is_incorrectly_formatted + ) and not load_info_attempted: info = await bridge.async_device_info() - if not mac: + if not mac or mac_is_incorrectly_formatted: LOGGER.debug("Attempting to get mac for %s", host) if info: mac = mac_from_device_info(info) @@ -215,7 +218,7 @@ async def _async_create_bridge_with_updated_data( # Samsung sometimes returns a value of "none" for the mac address # this should be ignored LOGGER.info("Updated mac to %s for %s", mac, host) - updated_data[CONF_MAC] = mac + updated_data[CONF_MAC] = dr.format_mac(mac) else: LOGGER.info("Failed to get mac for %s", host) diff --git a/homeassistant/components/samsungtv/config_flow.py b/homeassistant/components/samsungtv/config_flow.py index 2e6f64f08e1..e7f71210dfe 100644 --- a/homeassistant/components/samsungtv/config_flow.py +++ b/homeassistant/components/samsungtv/config_flow.py @@ -80,6 +80,17 @@ def _entry_is_complete( ) +def _mac_is_same_with_incorrect_formatting( + current_unformatted_mac: str, formatted_mac: str +) -> bool: + """Check if two macs are the same but formatted incorrectly.""" + current_formatted_mac = format_mac(current_unformatted_mac) + return ( + current_formatted_mac == formatted_mac + and current_unformatted_mac != current_formatted_mac + ) + + class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a Samsung TV config flow.""" @@ -359,7 +370,10 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): and data.get(CONF_SSDP_MAIN_TV_AGENT_LOCATION) != self._ssdp_main_tv_agent_location ) - update_mac = self._mac and not data.get(CONF_MAC) + update_mac = self._mac and ( + not (data_mac := data.get(CONF_MAC)) + or _mac_is_same_with_incorrect_formatting(data_mac, self._mac) + ) update_model = self._model and not data.get(CONF_MODEL) if ( update_ssdp_rendering_control_location @@ -464,7 +478,7 @@ class SamsungTVConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult: """Handle a flow initialized by dhcp discovery.""" LOGGER.debug("Samsung device found via DHCP: %s", discovery_info) - self._mac = discovery_info.macaddress + self._mac = format_mac(discovery_info.macaddress) self._host = discovery_info.ip self._async_start_discovery_with_mac_address() await self._async_set_device_unique_id() diff --git a/tests/components/samsungtv/const.py b/tests/components/samsungtv/const.py index a560a7dd92d..347886419f3 100644 --- a/tests/components/samsungtv/const.py +++ b/tests/components/samsungtv/const.py @@ -68,7 +68,7 @@ SAMPLE_DEVICE_INFO_WIFI = { "id": "uuid:be9554b9-c9fb-41f4-8920-22da015376a4", "device": { "modelName": "82GXARRS", - "wifiMac": "aa:bb:ww:ii:ff:ii", + "wifiMac": "aa:bb:aa:aa:aa:aa", "name": "[TV] Living Room", "type": "Samsung SmartTV", "networkType": "wireless", @@ -136,7 +136,7 @@ SAMPLE_DEVICE_INFO_UE48JU6400 = { "countryCode": "AT", "msfVersion": "2.0.25", "smartHubAgreement": "true", - "wifiMac": "aa:bb:ww:ii:ff:ii", + "wifiMac": "aa:bb:aa:aa:aa:aa", "developerMode": "0", "developerIP": "", }, diff --git a/tests/components/samsungtv/test_config_flow.py b/tests/components/samsungtv/test_config_flow.py index 0eacd63b42d..60b89766155 100644 --- a/tests/components/samsungtv/test_config_flow.py +++ b/tests/components/samsungtv/test_config_flow.py @@ -128,7 +128,7 @@ MOCK_SSDP_DATA_WRONGMODEL = ssdp.SsdpServiceInfo( }, ) MOCK_DHCP_DATA = dhcp.DhcpServiceInfo( - ip="fake_host", macaddress="aa:bb:dd:hh:cc:pp", hostname="fake_hostname" + ip="fake_host", macaddress="aabbccddeeff", hostname="fake_hostname" ) EXISTING_IP = "192.168.40.221" MOCK_ZEROCONF_DATA = zeroconf.ZeroconfServiceInfo( @@ -337,7 +337,7 @@ async def test_user_encrypted_websocket( assert result4["title"] == "TV-UE48JU6470 (UE48JU6400)" assert result4["data"][CONF_HOST] == "fake_host" assert result4["data"][CONF_NAME] == "TV-UE48JU6470" - assert result4["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result4["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result4["data"][CONF_MANUFACTURER] == "Samsung" assert result4["data"][CONF_MODEL] == "UE48JU6400" assert result4["data"][CONF_SSDP_RENDERING_CONTROL_LOCATION] is None @@ -641,7 +641,7 @@ async def test_ssdp_websocket_success_populates_mac_address_and_ssdp_location( assert result["title"] == "Living Room (82GXARRS)" assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_NAME] == "Living Room" - assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" assert result["data"][CONF_MODEL] == "82GXARRS" assert ( @@ -671,7 +671,7 @@ async def test_ssdp_websocket_success_populates_mac_address_and_main_tv_ssdp_loc assert result["title"] == "Living Room (82GXARRS)" assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_NAME] == "Living Room" - assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" assert result["data"][CONF_MODEL] == "82GXARRS" assert ( @@ -723,7 +723,7 @@ async def test_ssdp_encrypted_websocket_success_populates_mac_address_and_ssdp_l assert result4["title"] == "TV-UE48JU6470 (UE48JU6400)" assert result4["data"][CONF_HOST] == "fake_host" assert result4["data"][CONF_NAME] == "TV-UE48JU6470" - assert result4["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result4["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result4["data"][CONF_MANUFACTURER] == "Samsung fake_manufacturer" assert result4["data"][CONF_MODEL] == "UE48JU6400" assert ( @@ -916,7 +916,7 @@ async def test_dhcp_wireless(hass: HomeAssistant) -> None: assert result["title"] == "TV-UE48JU6470 (UE48JU6400)" assert result["data"][CONF_HOST] == "fake_host" assert result["data"][CONF_NAME] == "TV-UE48JU6470" - assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result["data"][CONF_MANUFACTURER] == "Samsung" assert result["data"][CONF_MODEL] == "UE48JU6400" assert result["result"].unique_id == "223da676-497a-4e06-9507-5e27ec4f0fb3" @@ -971,7 +971,7 @@ async def test_zeroconf(hass: HomeAssistant) -> None: assert result["title"] == "Living Room (82GXARRS)" assert result["data"][CONF_HOST] == "127.0.0.1" assert result["data"][CONF_NAME] == "Living Room" - assert result["data"][CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert result["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert result["data"][CONF_MANUFACTURER] == "Samsung" assert result["data"][CONF_MODEL] == "82GXARRS" assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1258,7 +1258,31 @@ async def test_update_missing_mac_unique_id_added_from_dhcp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:dd:hh:cc:pp" + assert entry.data[CONF_MAC] == "aa:bb:cc:dd:ee:ff" + assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" + + +@pytest.mark.usefixtures("remotews", "rest_api", "remoteencws_failing") +async def test_update_incorrectly_formatted_mac_unique_id_added_from_dhcp( + hass: HomeAssistant, mock_setup_entry: AsyncMock +) -> None: + """Test incorrectly formatted mac is updated and unique id added.""" + entry_data = MOCK_OLD_ENTRY.copy() + entry_data[CONF_MAC] = "aabbccddeeff" + entry = MockConfigEntry(domain=DOMAIN, data=entry_data, unique_id=None) + entry.add_to_hass(hass) + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_DHCP}, + data=MOCK_DHCP_DATA, + ) + await hass.async_block_till_done() + assert len(mock_setup_entry.mock_calls) == 1 + + assert result["type"] == "abort" + assert result["reason"] == "already_configured" + assert entry.data[CONF_MAC] == "aa:bb:cc:dd:ee:ff" assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1329,7 +1353,7 @@ async def test_update_missing_mac_unique_id_ssdp_location_added_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Wrong st assert CONF_SSDP_RENDERING_CONTROL_LOCATION not in entry.data assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1385,7 +1409,7 @@ async def test_update_missing_mac_unique_id_added_ssdp_location_updated_from_ssd assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Wrong ST, ssdp location should not change assert ( entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] == "https://1.2.3.4:555/test" @@ -1418,7 +1442,7 @@ async def test_update_missing_mac_unique_id_added_ssdp_location_rendering_st_upd assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Correct ST, ssdp location should change assert ( entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] @@ -1453,7 +1477,7 @@ async def test_update_missing_mac_unique_id_added_ssdp_location_main_tv_agent_st assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Main TV Agent ST, ssdp location should change assert ( entry.data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] @@ -1473,7 +1497,7 @@ async def test_update_ssdp_location_rendering_st_updated_from_ssdp( """Test with outdated ssdp_location with the correct st added via ssdp.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, unique_id="be9554b9-c9fb-41f4-8920-22da015376a4", ) entry.add_to_hass(hass) @@ -1488,7 +1512,7 @@ async def test_update_ssdp_location_rendering_st_updated_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Correct ST, ssdp location should be added assert ( entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] @@ -1504,7 +1528,7 @@ async def test_update_main_tv_ssdp_location_rendering_st_updated_from_ssdp( """Test with outdated ssdp_location with the correct st added via ssdp.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, unique_id="be9554b9-c9fb-41f4-8920-22da015376a4", ) entry.add_to_hass(hass) @@ -1519,7 +1543,7 @@ async def test_update_main_tv_ssdp_location_rendering_st_updated_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Correct ST for MainTV, ssdp location should be added assert ( entry.data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] @@ -1570,7 +1594,7 @@ async def test_update_legacy_missing_mac_from_dhcp( DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=dhcp.DhcpServiceInfo( - ip=EXISTING_IP, macaddress="aa:bb:cc:dd:ee:ff", hostname="fake_hostname" + ip=EXISTING_IP, macaddress="aabbccddeeff", hostname="fake_hostname" ), ) await hass.async_block_till_done() @@ -1604,7 +1628,7 @@ async def test_update_legacy_missing_mac_from_dhcp_no_unique_id( DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=dhcp.DhcpServiceInfo( - ip=EXISTING_IP, macaddress="aa:bb:cc:dd:ee:ff", hostname="fake_hostname" + ip=EXISTING_IP, macaddress="aabbccddeeff", hostname="fake_hostname" ), ) await hass.async_block_till_done() @@ -1623,7 +1647,7 @@ async def test_update_ssdp_location_unique_id_added_from_ssdp( """Test missing ssdp_location, and unique id added via ssdp.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, unique_id=None, ) entry.add_to_hass(hass) @@ -1638,7 +1662,7 @@ async def test_update_ssdp_location_unique_id_added_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Wrong st assert CONF_SSDP_RENDERING_CONTROL_LOCATION not in entry.data assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1651,7 +1675,7 @@ async def test_update_ssdp_location_unique_id_added_from_ssdp_with_rendering_con """Test missing ssdp_location, and unique id added via ssdp with rendering control st.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, unique_id=None, ) entry.add_to_hass(hass) @@ -1666,7 +1690,7 @@ async def test_update_ssdp_location_unique_id_added_from_ssdp_with_rendering_con assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" # Correct st assert ( entry.data[CONF_SSDP_RENDERING_CONTROL_LOCATION] @@ -1879,7 +1903,7 @@ async def test_update_incorrect_udn_matching_upnp_udn_unique_id_added_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1890,7 +1914,7 @@ async def test_update_incorrect_udn_matching_mac_unique_id_added_from_ssdp( """Test updating the wrong udn from ssdp via mac match.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_OLD_ENTRY, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, unique_id=None, ) entry.add_to_hass(hass) @@ -1905,7 +1929,7 @@ async def test_update_incorrect_udn_matching_mac_unique_id_added_from_ssdp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" @@ -1916,7 +1940,7 @@ async def test_update_incorrect_udn_matching_mac_from_dhcp( """Test that DHCP updates the wrong udn from ssdp via mac match.""" entry = MockConfigEntry( domain=DOMAIN, - data={**MOCK_ENTRYDATA_WS, CONF_MAC: "aa:bb:ww:ii:ff:ii"}, + data={**MOCK_ENTRYDATA_WS, CONF_MAC: "aa:bb:aa:aa:aa:aa"}, source=config_entries.SOURCE_SSDP, unique_id="0d1cef00-00dc-1000-9c80-4844f7b172de", ) @@ -1932,7 +1956,7 @@ async def test_update_incorrect_udn_matching_mac_from_dhcp( assert result["type"] == "abort" assert result["reason"] == "already_configured" - assert entry.data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4" diff --git a/tests/components/samsungtv/test_init.py b/tests/components/samsungtv/test_init.py index 526f7a12fed..cde5cebcefe 100644 --- a/tests/components/samsungtv/test_init.py +++ b/tests/components/samsungtv/test_init.py @@ -1,7 +1,8 @@ """Tests for the Samsung TV Integration.""" -from unittest.mock import Mock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest +from samsungtvws.async_remote import SamsungTVWSAsyncRemote from syrupy.assertion import SnapshotAssertion from homeassistant.components.media_player import DOMAIN, MediaPlayerEntityFeature @@ -100,7 +101,7 @@ async def test_setup_without_port_device_online(hass: HomeAssistant) -> None: config_entries_domain = hass.config_entries.async_entries(SAMSUNGTV_DOMAIN) assert len(config_entries_domain) == 1 - assert config_entries_domain[0].data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert config_entries_domain[0].data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" @pytest.mark.usefixtures("remotews", "remoteencws_failing") @@ -181,3 +182,33 @@ async def test_update_imported_legacy_without_method(hass: HomeAssistant) -> Non assert len(entries) == 1 assert entries[0].data[CONF_METHOD] == METHOD_LEGACY assert entries[0].data[CONF_PORT] == LEGACY_PORT + + +@pytest.mark.usefixtures("remotews", "rest_api") +async def test_incorrectly_formatted_mac_fixed(hass: HomeAssistant) -> None: + """Test incorrectly formatted mac is corrected.""" + with patch( + "homeassistant.components.samsungtv.bridge.SamsungTVWSAsyncRemote" + ) as remote_class: + remote = Mock(SamsungTVWSAsyncRemote) + remote.__aenter__ = AsyncMock(return_value=remote) + remote.__aexit__ = AsyncMock() + remote.token = "123456789" + remote_class.return_value = remote + + await setup_samsungtv_entry( + hass, + { + CONF_HOST: "fake_host", + CONF_NAME: "fake", + CONF_PORT: 8001, + CONF_TOKEN: "123456789", + CONF_METHOD: METHOD_WEBSOCKET, + CONF_MAC: "aabbaaaaaaaa", + }, + ) + await hass.async_block_till_done() + + config_entries = hass.config_entries.async_entries(SAMSUNGTV_DOMAIN) + assert len(config_entries) == 1 + assert config_entries[0].data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" diff --git a/tests/components/samsungtv/test_media_player.py b/tests/components/samsungtv/test_media_player.py index 27a06ef3a13..c4c6a08b88b 100644 --- a/tests/components/samsungtv/test_media_player.py +++ b/tests/components/samsungtv/test_media_player.py @@ -162,7 +162,7 @@ async def test_setup_websocket(hass: HomeAssistant) -> None: config_entries = hass.config_entries.async_entries(SAMSUNGTV_DOMAIN) assert len(config_entries) == 1 - assert config_entries[0].data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert config_entries[0].data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" @pytest.mark.usefixtures("rest_api") @@ -194,7 +194,7 @@ async def test_setup_websocket_2( assert await hass.config_entries.async_setup(entry.entry_id) await hass.async_block_till_done() - assert config_entries[0].data[CONF_MAC] == "aa:bb:ww:ii:ff:ii" + assert config_entries[0].data[CONF_MAC] == "aa:bb:aa:aa:aa:aa" next_update = mock_now + timedelta(minutes=5) freezer.move_to(next_update)