mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 08:47:57 +00:00
Add WS command thread/delete_dataset (#88077)
* Add WS command thread/delete_dataset * Return not_allowed when trying to delete the preferred dataset
This commit is contained in:
parent
4221433ca6
commit
80e2f96097
@ -9,6 +9,7 @@ from typing import Any, cast
|
||||
from python_otbr_api import tlv_parser
|
||||
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.singleton import singleton
|
||||
from homeassistant.helpers.storage import Store
|
||||
from homeassistant.util import dt as dt_util, ulid as ulid_util
|
||||
@ -20,6 +21,10 @@ STORAGE_VERSION_MINOR = 1
|
||||
SAVE_DELAY = 10
|
||||
|
||||
|
||||
class DatasetPreferredError(HomeAssistantError):
|
||||
"""Raised when attempting to delete the preferred dataset."""
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class DatasetEntry:
|
||||
"""Dataset store entry."""
|
||||
@ -92,6 +97,14 @@ class DatasetStore:
|
||||
self.preferred_dataset = entry.id
|
||||
self.async_schedule_save()
|
||||
|
||||
@callback
|
||||
def async_delete(self, dataset_id: str) -> None:
|
||||
"""Delete dataset."""
|
||||
if self.preferred_dataset == dataset_id:
|
||||
raise DatasetPreferredError("attempt to remove preferred dataset")
|
||||
del self.datasets[dataset_id]
|
||||
self.async_schedule_save()
|
||||
|
||||
@callback
|
||||
def async_get(self, dataset_id: str) -> DatasetEntry | None:
|
||||
"""Get dataset by id."""
|
||||
|
@ -16,6 +16,7 @@ from . import dataset_store, discovery
|
||||
def async_setup(hass: HomeAssistant) -> None:
|
||||
"""Set up the sensor websocket API."""
|
||||
websocket_api.async_register_command(hass, ws_add_dataset)
|
||||
websocket_api.async_register_command(hass, ws_delete_dataset)
|
||||
websocket_api.async_register_command(hass, ws_discover_routers)
|
||||
websocket_api.async_register_command(hass, ws_get_dataset)
|
||||
websocket_api.async_register_command(hass, ws_list_datasets)
|
||||
@ -48,6 +49,33 @@ async def ws_add_dataset(
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "thread/delete_dataset",
|
||||
vol.Required("dataset_id"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def ws_delete_dataset(
|
||||
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
|
||||
) -> None:
|
||||
"""Delete a thread dataset."""
|
||||
dataset_id = msg["dataset_id"]
|
||||
|
||||
store = await dataset_store.async_get_store(hass)
|
||||
try:
|
||||
store.async_delete(dataset_id)
|
||||
except KeyError as exc:
|
||||
connection.send_error(msg["id"], websocket_api.const.ERR_NOT_FOUND, str(exc))
|
||||
return
|
||||
except dataset_store.DatasetPreferredError as exc:
|
||||
connection.send_error(msg["id"], websocket_api.const.ERR_NOT_ALLOWED, str(exc))
|
||||
return
|
||||
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
|
@ -30,6 +30,7 @@ MAX_PENDING_MSG: Final = 4096
|
||||
|
||||
ERR_ID_REUSE: Final = "id_reuse"
|
||||
ERR_INVALID_FORMAT: Final = "invalid_format"
|
||||
ERR_NOT_ALLOWED: Final = "not_allowed"
|
||||
ERR_NOT_FOUND: Final = "not_found"
|
||||
ERR_NOT_SUPPORTED: Final = "not_supported"
|
||||
ERR_HOME_ASSISTANT_ERROR: Final = "home_assistant_error"
|
||||
|
@ -4,6 +4,7 @@ from python_otbr_api.tlv_parser import TLVError
|
||||
|
||||
from homeassistant.components.thread import dataset_store
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from . import DATASET_1, DATASET_2, DATASET_3
|
||||
|
||||
@ -52,6 +53,34 @@ async def test_add_dataset_reordered(hass: HomeAssistant) -> None:
|
||||
assert list(store.datasets.values())[0].created == created
|
||||
|
||||
|
||||
async def test_delete_dataset_twice(hass: HomeAssistant) -> None:
|
||||
"""Test deleting dataset twice raises."""
|
||||
await dataset_store.async_add_dataset(hass, "source", DATASET_1)
|
||||
await dataset_store.async_add_dataset(hass, "source", DATASET_2)
|
||||
|
||||
store = await dataset_store.async_get_store(hass)
|
||||
dataset_id = list(store.datasets.values())[1].id
|
||||
|
||||
store.async_delete(dataset_id)
|
||||
assert len(store.datasets) == 1
|
||||
|
||||
with pytest.raises(KeyError, match=f"'{dataset_id}'"):
|
||||
store.async_delete(dataset_id)
|
||||
assert len(store.datasets) == 1
|
||||
|
||||
|
||||
async def test_delete_preferred_dataset(hass: HomeAssistant) -> None:
|
||||
"""Test deleting preferred dataset raises."""
|
||||
await dataset_store.async_add_dataset(hass, "source", DATASET_1)
|
||||
|
||||
store = await dataset_store.async_get_store(hass)
|
||||
dataset_id = list(store.datasets.values())[0].id
|
||||
|
||||
with pytest.raises(HomeAssistantError, match="attempt to remove preferred dataset"):
|
||||
store.async_delete(dataset_id)
|
||||
assert len(store.datasets) == 1
|
||||
|
||||
|
||||
async def test_get_preferred_dataset(hass: HomeAssistant) -> None:
|
||||
"""Test get the preferred dataset."""
|
||||
assert await dataset_store.async_get_preferred_dataset(hass) is None
|
||||
@ -133,24 +162,27 @@ async def test_load_datasets(hass: HomeAssistant) -> None:
|
||||
|
||||
assert store1.preferred_dataset == dataset_1_store_1.id
|
||||
|
||||
with pytest.raises(HomeAssistantError):
|
||||
store1.async_delete(dataset_1_store_1.id)
|
||||
store1.async_delete(dataset_2_store_1.id)
|
||||
|
||||
assert len(store1.datasets) == 2
|
||||
|
||||
store2 = dataset_store.DatasetStore(hass)
|
||||
await flush_store(store1._store)
|
||||
await store2.async_load()
|
||||
|
||||
assert len(store2.datasets) == 3
|
||||
assert len(store2.datasets) == 2
|
||||
|
||||
for dataset in store2.datasets.values():
|
||||
if dataset.source == "Google":
|
||||
dataset_1_store_2 = dataset
|
||||
if dataset.source == "Multipan":
|
||||
dataset_2_store_2 = dataset
|
||||
if dataset.source == "🎅":
|
||||
dataset_3_store_2 = dataset
|
||||
|
||||
assert list(store1.datasets) == list(store2.datasets)
|
||||
|
||||
assert dataset_1_store_1 == dataset_1_store_2
|
||||
assert dataset_2_store_1 == dataset_2_store_2
|
||||
assert dataset_3_store_1 == dataset_3_store_2
|
||||
|
||||
|
||||
|
@ -17,6 +17,8 @@ from . import (
|
||||
ROUTER_DISCOVERY_HASS,
|
||||
)
|
||||
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
async def test_add_dataset(hass: HomeAssistant, hass_ws_client) -> None:
|
||||
"""Test we can add a dataset."""
|
||||
@ -54,6 +56,62 @@ async def test_add_invalid_dataset(hass: HomeAssistant, hass_ws_client) -> None:
|
||||
assert msg["error"] == {"code": "invalid_format", "message": "unknown type 222"}
|
||||
|
||||
|
||||
async def test_delete_dataset(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test we can delete a dataset."""
|
||||
assert await async_setup_component(hass, DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{"type": "thread/add_dataset_tlv", "source": "test", "tlv": DATASET_1}
|
||||
)
|
||||
msg = await client.receive_json()
|
||||
assert msg["success"]
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{"type": "thread/add_dataset_tlv", "source": "test", "tlv": DATASET_2}
|
||||
)
|
||||
msg = await client.receive_json()
|
||||
assert msg["success"]
|
||||
|
||||
await client.send_json_auto_id({"type": "thread/list_datasets"})
|
||||
msg = await client.receive_json()
|
||||
assert msg["success"]
|
||||
datasets = msg["result"]["datasets"]
|
||||
|
||||
# Try deleting the preferred dataset
|
||||
await client.send_json_auto_id(
|
||||
{"type": "thread/delete_dataset", "dataset_id": datasets[0]["dataset_id"]}
|
||||
)
|
||||
msg = await client.receive_json()
|
||||
assert not msg["success"]
|
||||
assert msg["error"] == {
|
||||
"code": "not_allowed",
|
||||
"message": "attempt to remove preferred dataset",
|
||||
}
|
||||
|
||||
# Try deleting a non preferred dataset
|
||||
await client.send_json_auto_id(
|
||||
{"type": "thread/delete_dataset", "dataset_id": datasets[1]["dataset_id"]}
|
||||
)
|
||||
msg = await client.receive_json()
|
||||
assert msg["success"]
|
||||
|
||||
# Try deleting the same dataset again
|
||||
await client.send_json_auto_id(
|
||||
{"type": "thread/delete_dataset", "dataset_id": datasets[1]["dataset_id"]}
|
||||
)
|
||||
msg = await client.receive_json()
|
||||
assert not msg["success"]
|
||||
assert msg["error"] == {
|
||||
"code": "not_found",
|
||||
"message": f"'{datasets[1]['dataset_id']}'",
|
||||
}
|
||||
|
||||
|
||||
async def test_list_get_dataset(hass: HomeAssistant, hass_ws_client) -> None:
|
||||
"""Test list and get datasets."""
|
||||
assert await async_setup_component(hass, DOMAIN, {})
|
||||
|
Loading…
x
Reference in New Issue
Block a user