Raise on non-string unique id for config entry (#125950)

* Raise on non-string unique id for config entry

* Add test update entry

* Fix breaking

* Add check get_entry_by_domain_and_unique_id

* Naming

* Add test

* Fix logic

* No unique id

* Fix tests

* Fixes

* Fix gardena

* Not related to this PR

* Update docstring and comment

---------

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
G Johansson 2024-10-30 18:09:50 +01:00 committed by GitHub
parent 3db6d82904
commit a4f210379d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 159 additions and 39 deletions

View File

@ -1608,6 +1608,7 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
def __setitem__(self, entry_id: str, entry: ConfigEntry) -> None: def __setitem__(self, entry_id: str, entry: ConfigEntry) -> None:
"""Add an item.""" """Add an item."""
data = self.data data = self.data
self.check_unique_id(entry)
if entry_id in data: if entry_id in data:
# This is likely a bug in a test that is adding the same entry twice. # This is likely a bug in a test that is adding the same entry twice.
# In the future, once we have fixed the tests, this will raise HomeAssistantError. # In the future, once we have fixed the tests, this will raise HomeAssistantError.
@ -1616,16 +1617,19 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
data[entry_id] = entry data[entry_id] = entry
self._index_entry(entry) self._index_entry(entry)
def _index_entry(self, entry: ConfigEntry) -> None: def check_unique_id(self, entry: ConfigEntry) -> None:
"""Index an entry.""" """Check config entry unique id.
self._domain_index.setdefault(entry.domain, []).append(entry)
if entry.unique_id is not None: For a string unique id (this is the correct case): return
unique_id_hash = entry.unique_id For a hashable non string unique id: log warning
if not isinstance(entry.unique_id, str): For a non-hashable unique id: raise error
# Guard against integrations using unhashable unique_id """
# In HA Core 2024.9, we should remove the guard and instead fail if (unique_id := entry.unique_id) is None:
if not isinstance(entry.unique_id, Hashable): # type: ignore[unreachable] return
unique_id_hash = str(entry.unique_id) if isinstance(unique_id, str):
# Unique id should be a string
return
if isinstance(unique_id, Hashable): # type: ignore[unreachable]
# Checks for other non-string was added in HA Core 2024.10 # Checks for other non-string was added in HA Core 2024.10
# In HA Core 2025.10, we should remove the error and instead fail # In HA Core 2025.10, we should remove the error and instead fail
report_issue = async_suggest_report_issue( report_issue = async_suggest_report_issue(
@ -1641,9 +1645,20 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
entry.unique_id, entry.unique_id,
report_issue, report_issue,
) )
else:
# Guard against integrations using unhashable unique_id
# In HA Core 2024.11, the guard was changed from warning to failing
raise HomeAssistantError(
f"The entry unique id {unique_id} is not a string."
)
def _index_entry(self, entry: ConfigEntry) -> None:
"""Index an entry."""
self.check_unique_id(entry)
self._domain_index.setdefault(entry.domain, []).append(entry)
if entry.unique_id is not None:
self._domain_unique_id_index.setdefault(entry.domain, {}).setdefault( self._domain_unique_id_index.setdefault(entry.domain, {}).setdefault(
unique_id_hash, [] entry.unique_id, []
).append(entry) ).append(entry)
def _unindex_entry(self, entry_id: str) -> None: def _unindex_entry(self, entry_id: str) -> None:
@ -1654,9 +1669,6 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
if not self._domain_index[domain]: if not self._domain_index[domain]:
del self._domain_index[domain] del self._domain_index[domain]
if (unique_id := entry.unique_id) is not None: if (unique_id := entry.unique_id) is not None:
# Check type first to avoid expensive isinstance call
if type(unique_id) is not str and not isinstance(unique_id, Hashable): # noqa: E721
unique_id = str(entry.unique_id) # type: ignore[unreachable]
self._domain_unique_id_index[domain][unique_id].remove(entry) self._domain_unique_id_index[domain][unique_id].remove(entry)
if not self._domain_unique_id_index[domain][unique_id]: if not self._domain_unique_id_index[domain][unique_id]:
del self._domain_unique_id_index[domain][unique_id] del self._domain_unique_id_index[domain][unique_id]
@ -1675,6 +1687,7 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
""" """
entry_id = entry.entry_id entry_id = entry.entry_id
self._unindex_entry(entry_id) self._unindex_entry(entry_id)
self.check_unique_id(entry)
object.__setattr__(entry, "unique_id", new_unique_id) object.__setattr__(entry, "unique_id", new_unique_id)
self._index_entry(entry) self._index_entry(entry)
entry.clear_state_cache() entry.clear_state_cache()
@ -1688,9 +1701,12 @@ class ConfigEntryItems(UserDict[str, ConfigEntry]):
self, domain: str, unique_id: str self, domain: str, unique_id: str
) -> ConfigEntry | None: ) -> ConfigEntry | None:
"""Get entry by domain and unique id.""" """Get entry by domain and unique id."""
# Check type first to avoid expensive isinstance call if unique_id is None:
if type(unique_id) is not str and not isinstance(unique_id, Hashable): # noqa: E721 return None # type: ignore[unreachable]
unique_id = str(unique_id) # type: ignore[unreachable] if not isinstance(unique_id, Hashable):
raise HomeAssistantError(
f"The entry unique id {unique_id} is not a string."
)
entries = self._domain_unique_id_index.get(domain, {}).get(unique_id) entries = self._domain_unique_id_index.get(domain, {}).get(unique_id)
if not entries: if not entries:
return None return None

View File

@ -6,6 +6,7 @@ import asyncio
from collections.abc import Generator from collections.abc import Generator
from datetime import timedelta from datetime import timedelta
import logging import logging
import re
from typing import Any, Self from typing import Any, Self
from unittest.mock import ANY, AsyncMock, Mock, patch from unittest.mock import ANY, AsyncMock, Mock, patch
@ -5348,10 +5349,10 @@ async def test_update_entry_and_reload(
@pytest.mark.parametrize("unique_id", [["blah", "bleh"], {"key": "value"}]) @pytest.mark.parametrize("unique_id", [["blah", "bleh"], {"key": "value"}])
async def test_unhashable_unique_id( async def test_unhashable_unique_id_fails(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, unique_id: Any hass: HomeAssistant, caplog: pytest.LogCaptureFixture, unique_id: Any
) -> None: ) -> None:
"""Test the ConfigEntryItems user dict handles unhashable unique_id.""" """Test the ConfigEntryItems user dict fails unhashable unique_id."""
entries = config_entries.ConfigEntryItems(hass) entries = config_entries.ConfigEntryItems(hass)
entry = config_entries.ConfigEntry( entry = config_entries.ConfigEntry(
data={}, data={},
@ -5366,23 +5367,96 @@ async def test_unhashable_unique_id(
version=1, version=1,
) )
unique_id_string = re.escape(str(unique_id))
with pytest.raises(
HomeAssistantError,
match=f"The entry unique id {unique_id_string} is not a string.",
):
entries[entry.entry_id] = entry entries[entry.entry_id] = entry
assert entry.entry_id not in entries
with pytest.raises(
HomeAssistantError,
match=f"The entry unique id {unique_id_string} is not a string.",
):
entries.get_entry_by_domain_and_unique_id("test", unique_id)
@pytest.mark.parametrize("unique_id", [["blah", "bleh"], {"key": "value"}])
async def test_unhashable_unique_id_fails_on_update(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, unique_id: Any
) -> None:
"""Test the ConfigEntryItems user dict fails non-hashable unique_id on update."""
entries = config_entries.ConfigEntryItems(hass)
entry = config_entries.ConfigEntry(
data={},
discovery_keys={},
domain="test",
entry_id="mock_id",
minor_version=1,
options={},
source="test",
title="title",
unique_id="123",
version=1,
)
entries[entry.entry_id] = entry
assert entry.entry_id in entries
unique_id_string = re.escape(str(unique_id))
with pytest.raises(
HomeAssistantError,
match=f"The entry unique id {unique_id_string} is not a string.",
):
entries.update_unique_id(entry, unique_id)
async def test_string_unique_id_no_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test the ConfigEntryItems user dict string unique id doesn't log warning."""
entries = config_entries.ConfigEntryItems(hass)
entry = config_entries.ConfigEntry(
data={},
discovery_keys={},
domain="test",
entry_id="mock_id",
minor_version=1,
options={},
source="test",
title="title",
unique_id="123",
version=1,
)
entries[entry.entry_id] = entry
assert ( assert (
"Config entry 'title' from integration test has an invalid unique_id " "Config entry 'title' from integration test has an invalid unique_id"
f"'{unique_id!s}'" ) not in caplog.text
) in caplog.text
assert entry.entry_id in entries assert entry.entry_id in entries
assert entries[entry.entry_id] is entry assert entries[entry.entry_id] is entry
assert entries.get_entry_by_domain_and_unique_id("test", unique_id) == entry assert entries.get_entry_by_domain_and_unique_id("test", "123") == entry
del entries[entry.entry_id] del entries[entry.entry_id]
assert not entries assert not entries
assert entries.get_entry_by_domain_and_unique_id("test", unique_id) is None assert entries.get_entry_by_domain_and_unique_id("test", "123") is None
@pytest.mark.parametrize("unique_id", [123]) @pytest.mark.parametrize(
async def test_hashable_non_string_unique_id( "unique_id",
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, unique_id: Any [
(123),
(2.3),
],
)
async def test_hashable_unique_id(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
unique_id: Any,
) -> None: ) -> None:
"""Test the ConfigEntryItems user dict handles hashable non string unique_id.""" """Test the ConfigEntryItems user dict handles hashable non string unique_id."""
entries = config_entries.ConfigEntryItems(hass) entries = config_entries.ConfigEntryItems(hass)
@ -5400,6 +5474,7 @@ async def test_hashable_non_string_unique_id(
) )
entries[entry.entry_id] = entry entries[entry.entry_id] = entry
assert ( assert (
"Config entry 'title' from integration test has an invalid unique_id" "Config entry 'title' from integration test has an invalid unique_id"
) in caplog.text ) in caplog.text
@ -5412,6 +5487,35 @@ async def test_hashable_non_string_unique_id(
assert entries.get_entry_by_domain_and_unique_id("test", unique_id) is None assert entries.get_entry_by_domain_and_unique_id("test", unique_id) is None
async def test_no_unique_id_no_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test the ConfigEntryItems user dict don't log warning with no unique id."""
entries = config_entries.ConfigEntryItems(hass)
entry = config_entries.ConfigEntry(
data={},
discovery_keys={},
domain="test",
entry_id="mock_id",
minor_version=1,
options={},
source="test",
title="title",
unique_id=None,
version=1,
)
entries[entry.entry_id] = entry
assert (
"Config entry 'title' from integration test has an invalid unique_id"
) not in caplog.text
assert entry.entry_id in entries
assert entries[entry.entry_id] is entry
@pytest.mark.parametrize( @pytest.mark.parametrize(
("context", "user_input", "expected_result"), ("context", "user_input", "expected_result"),
[ [