Type system storage

This commit is contained in:
Paul Bottein
2025-11-12 10:56:11 +01:00
parent 779dc20f07
commit c1afba419c
3 changed files with 150 additions and 121 deletions

View File

@@ -0,0 +1,4 @@
"""Constants for the frontend component."""
# System storage keys
SYSTEM_DATA_DEFAULT_PANEL = "default_panel"

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Callable, Coroutine from collections.abc import Callable, Coroutine
from functools import wraps from functools import wraps
from typing import Any from typing import Any, TypedDict, cast
import voluptuous as vol import voluptuous as vol
@@ -14,6 +14,15 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.storage import Store from homeassistant.helpers.storage import Store
from homeassistant.util.hass_dict import HassKey from homeassistant.util.hass_dict import HassKey
from .const import SYSTEM_DATA_DEFAULT_PANEL
class SystemStorageData(TypedDict, total=False):
"""System storage data structure."""
default_panel: str | None
DATA_STORAGE: HassKey[dict[str, UserStore]] = HassKey("frontend_storage") DATA_STORAGE: HassKey[dict[str, UserStore]] = HassKey("frontend_storage")
DATA_SYSTEM_STORAGE: HassKey[SystemStore] = HassKey("frontend_system_storage") DATA_SYSTEM_STORAGE: HassKey[SystemStore] = HassKey("frontend_system_storage")
STORAGE_VERSION_USER_DATA = 1 STORAGE_VERSION_USER_DATA = 1
@@ -102,7 +111,7 @@ class SystemStore:
def __init__(self, hass: HomeAssistant) -> None: def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the system store.""" """Initialize the system store."""
self._store = _SystemStore(hass) self._store = _SystemStore(hass)
self.data: dict[str, Any] = {} self.data: SystemStorageData = {}
self.subscriptions: dict[str | None, list[Callable[[], None]]] = {} self.subscriptions: dict[str | None, list[Callable[[], None]]] = {}
async def async_load(self) -> None: async def async_load(self) -> None:
@@ -111,7 +120,7 @@ class SystemStore:
async def async_set_item(self, key: str, value: Any) -> None: async def async_set_item(self, key: str, value: Any) -> None:
"""Set an item and save the store.""" """Set an item and save the store."""
self.data[key] = value cast(dict[str, Any], self.data)[key] = value
await self._store.async_save(self.data) await self._store.async_save(self.data)
for cb in self.subscriptions.get(None, []): for cb in self.subscriptions.get(None, []):
cb() cb()
@@ -132,7 +141,7 @@ class SystemStore:
return unsubscribe return unsubscribe
class _SystemStore(Store[dict[str, Any]]): class _SystemStore(Store[SystemStorageData]):
"""System store for frontend data.""" """System store for frontend data."""
def __init__(self, hass: HomeAssistant) -> None: def __init__(self, hass: HomeAssistant) -> None:
@@ -257,8 +266,7 @@ async def websocket_subscribe_user_data(
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): "frontend/set_system_data", vol.Required("type"): "frontend/set_system_data",
vol.Required("key"): str, vol.Required(SYSTEM_DATA_DEFAULT_PANEL): vol.Any(str, None),
vol.Required("value"): vol.Any(bool, str, int, float, dict, list, None),
} }
) )
@websocket_api.async_response @websocket_api.async_response
@@ -274,7 +282,7 @@ async def websocket_set_system_data(
connection.send_error(msg["id"], "unauthorized", "Admin access required") connection.send_error(msg["id"], "unauthorized", "Admin access required")
return return
await store.async_set_item(msg["key"], msg["value"]) await store.async_set_item("default_panel", msg[SYSTEM_DATA_DEFAULT_PANEL])
connection.send_result(msg["id"]) connection.send_result(msg["id"])

View File

@@ -328,30 +328,20 @@ async def test_get_system_data(
hass_storage[storage_key] = { hass_storage[storage_key] = {
"key": storage_key, "key": storage_key,
"version": 1, "version": 1,
"data": {"test-key": "test-value", "test-complex": [{"foo": "bar"}]}, "data": {"default_panel": "lovelace"},
} }
client = await hass_ws_client(hass) client = await hass_ws_client(hass)
# Get a simple string key # Get default_panel key
await client.send_json( await client.send_json(
{"id": 6, "type": "frontend/get_system_data", "key": "test-key"} {"id": 6, "type": "frontend/get_system_data", "key": "default_panel"}
) )
res = await client.receive_json() res = await client.receive_json()
assert res["success"], res assert res["success"], res
assert res["result"]["value"] == "test-value" assert res["result"]["value"] == "lovelace"
# Get a more complex key
await client.send_json(
{"id": 7, "type": "frontend/get_system_data", "key": "test-complex"}
)
res = await client.receive_json()
assert res["success"], res
assert res["result"]["value"][0]["foo"] == "bar"
# Get all data (no key) # Get all data (no key)
@@ -359,16 +349,15 @@ async def test_get_system_data(
res = await client.receive_json() res = await client.receive_json()
assert res["success"], res assert res["success"], res
assert res["result"]["value"]["test-key"] == "test-value" assert res["result"]["value"]["default_panel"] == "lovelace"
assert res["result"]["value"]["test-complex"][0]["foo"] == "bar"
@pytest.mark.parametrize( @pytest.mark.parametrize(
("subscriptions", "events"), ("subscriptions", "events"),
[ [
([], []), ([], []),
([(1, {}, {})], [(1, {"test-key": "test-value"})]), ([(1, {}, {})], [(1, {"default_panel": "lovelace"})]),
([(1, {"key": "test-key"}, None)], [(1, "test-value")]), ([(1, {"key": "default_panel"}, None)], [(1, "lovelace")]),
([(1, {"key": "other-key"}, None)], []), ([(1, {"key": "other-key"}, None)], []),
], ],
) )
@@ -406,7 +395,7 @@ async def test_set_system_data_empty(
# test creating # test creating
await client.send_json( await client.send_json(
{"id": 6, "type": "frontend/get_system_data", "key": "test-key"} {"id": 6, "type": "frontend/get_system_data", "key": "default_panel"}
) )
res = await client.receive_json() res = await client.receive_json()
@@ -417,8 +406,7 @@ async def test_set_system_data_empty(
{ {
"id": 7, "id": 7,
"type": "frontend/set_system_data", "type": "frontend/set_system_data",
"key": "test-key", "default_panel": "lovelace",
"value": "test-value",
} }
) )
@@ -430,62 +418,21 @@ async def test_set_system_data_empty(
assert res["success"], res assert res["success"], res
await client.send_json( await client.send_json(
{"id": 8, "type": "frontend/get_system_data", "key": "test-key"} {"id": 8, "type": "frontend/get_system_data", "key": "default_panel"}
) )
res = await client.receive_json() res = await client.receive_json()
assert res["success"], res assert res["success"], res
assert res["result"]["value"] == "test-value" assert res["result"]["value"] == "lovelace"
@pytest.mark.parametrize( @pytest.mark.parametrize(
("subscriptions", "events"), ("subscriptions", "events"),
[ [
( ([], []),
[], ([(1, {}, {"default_panel": "energy"})], [(1, {"default_panel": "lovelace"})]),
[[], []], ([(1, {"key": "default_panel"}, "energy")], [(1, "lovelace")]),
), ([(1, {"key": "other-key"}, None)], []),
(
[(1, {}, {"test-key": "test-value", "test-complex": "string"})],
[
[
(
1,
{
"test-complex": "string",
"test-key": "test-value",
"test-non-existent-key": "test-value-new",
},
)
],
[
(
1,
{
"test-complex": [{"foo": "bar"}],
"test-key": "test-value",
"test-non-existent-key": "test-value-new",
},
)
],
],
),
(
[(1, {"key": "test-key"}, "test-value")],
[[], []],
),
(
[(1, {"key": "test-non-existent-key"}, None)],
[[(1, "test-value-new")], []],
),
(
[(1, {"key": "test-complex"}, "string")],
[[], [(1, [{"foo": "bar"}])]],
),
(
[(1, {"key": "other-key"}, None)],
[[], []],
),
], ],
) )
async def test_set_system_data( async def test_set_system_data(
@@ -493,13 +440,13 @@ async def test_set_system_data(
hass_ws_client: WebSocketGenerator, hass_ws_client: WebSocketGenerator,
hass_storage: dict[str, Any], hass_storage: dict[str, Any],
subscriptions: list[tuple[int, dict[str, str], Any]], subscriptions: list[tuple[int, dict[str, str], Any]],
events: list[list[tuple[int, Any]]], events: list[tuple[int, Any]],
) -> None: ) -> None:
"""Test set_system_data command with initial data.""" """Test set_system_data command with initial data."""
storage_key = f"{DOMAIN}.system_data" storage_key = f"{DOMAIN}.system_data"
hass_storage[storage_key] = { hass_storage[storage_key] = {
"version": 1, "version": 1,
"data": {"test-key": "test-value", "test-complex": "string"}, "data": {"default_panel": "energy"},
} }
client = await hass_ws_client(hass) client = await hass_ws_client(hass)
@@ -523,18 +470,17 @@ async def test_set_system_data(
res = await client.receive_json() res = await client.receive_json()
assert res["success"], res assert res["success"], res
# test creating # test updating default_panel
await client.send_json( await client.send_json(
{ {
"id": 5, "id": 5,
"type": "frontend/set_system_data", "type": "frontend/set_system_data",
"key": "test-non-existent-key", "default_panel": "lovelace",
"value": "test-value-new",
} }
) )
for msg_id, event_data in events[0]: for msg_id, event_data in events:
event = await client.receive_json() event = await client.receive_json()
assert event == {"id": msg_id, "type": "event", "event": {"value": event_data}} assert event == {"id": msg_id, "type": "event", "event": {"value": event_data}}
@@ -542,48 +488,12 @@ async def test_set_system_data(
assert res["success"], res assert res["success"], res
await client.send_json( await client.send_json(
{"id": 6, "type": "frontend/get_system_data", "key": "test-non-existent-key"} {"id": 6, "type": "frontend/get_system_data", "key": "default_panel"}
) )
res = await client.receive_json() res = await client.receive_json()
assert res["success"], res assert res["success"], res
assert res["result"]["value"] == "test-value-new" assert res["result"]["value"] == "lovelace"
# test updating with complex data
await client.send_json(
{
"id": 7,
"type": "frontend/set_system_data",
"key": "test-complex",
"value": [{"foo": "bar"}],
}
)
for msg_id, event_data in events[1]:
event = await client.receive_json()
assert event == {"id": msg_id, "type": "event", "event": {"value": event_data}}
res = await client.receive_json()
assert res["success"], res
await client.send_json(
{"id": 8, "type": "frontend/get_system_data", "key": "test-complex"}
)
res = await client.receive_json()
assert res["success"], res
assert res["result"]["value"][0]["foo"] == "bar"
# ensure other existing key was not modified
await client.send_json(
{"id": 9, "type": "frontend/get_system_data", "key": "test-key"}
)
res = await client.receive_json()
assert res["success"], res
assert res["result"]["value"] == "test-value"
async def test_set_system_data_requires_admin( async def test_set_system_data_requires_admin(
@@ -598,8 +508,7 @@ async def test_set_system_data_requires_admin(
{ {
"id": 5, "id": 5,
"type": "frontend/set_system_data", "type": "frontend/set_system_data",
"key": "test-key", "default_panel": "lovelace",
"value": "test-value",
} }
) )
@@ -607,3 +516,111 @@ async def test_set_system_data_requires_admin(
assert not res["success"], res assert not res["success"], res
assert res["error"]["code"] == "unauthorized" assert res["error"]["code"] == "unauthorized"
assert res["error"]["message"] == "Admin access required" assert res["error"]["message"] == "Admin access required"
async def test_set_system_data_default_panel_string(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test setting default_panel with valid string value."""
client = await hass_ws_client(hass)
# Test setting default_panel with a string value
await client.send_json(
{
"id": 1,
"type": "frontend/set_system_data",
"default_panel": "lovelace",
}
)
res = await client.receive_json()
assert res["success"], res
# Verify the value was set
await client.send_json(
{"id": 2, "type": "frontend/get_system_data", "key": "default_panel"}
)
res = await client.receive_json()
assert res["success"], res
assert res["result"]["value"] == "lovelace"
async def test_set_system_data_default_panel_none(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test setting default_panel with None value."""
client = await hass_ws_client(hass)
# Test setting default_panel with None
await client.send_json(
{
"id": 1,
"type": "frontend/set_system_data",
"default_panel": None,
}
)
res = await client.receive_json()
assert res["success"], res
# Verify the value was set
await client.send_json(
{"id": 2, "type": "frontend/get_system_data", "key": "default_panel"}
)
res = await client.receive_json()
assert res["success"], res
assert res["result"]["value"] is None
@pytest.mark.parametrize(
("invalid_value", "value_type"),
[
(True, "bool"),
(123, "int"),
(45.6, "float"),
({"panel": "lovelace"}, "dict"),
(["lovelace", "energy"], "list"),
],
)
async def test_set_system_data_default_panel_invalid_types(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
invalid_value: Any,
value_type: str,
) -> None:
"""Test setting default_panel with invalid types fails validation."""
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 1,
"type": "frontend/set_system_data",
"default_panel": invalid_value,
}
)
res = await client.receive_json()
assert not res["success"], res
assert res["error"]["code"] == "invalid_format"
async def test_set_system_data_invalid_key(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test that keys other than default_panel are rejected by schema."""
client = await hass_ws_client(hass)
# Test that other keys are rejected by voluptuous schema
await client.send_json(
{
"id": 1,
"type": "frontend/set_system_data",
"other_key": "test-value",
}
)
res = await client.receive_json()
assert not res["success"], "Invalid key should have been rejected by schema"
assert res["error"]["code"] == "invalid_format"