mirror of
https://github.com/home-assistant/core.git
synced 2025-07-21 12:17:07 +00:00
Allow deleting files from media source (#66975)
This commit is contained in:
parent
abaf284ef2
commit
8080aab98e
@ -10,7 +10,7 @@ from aiohttp import web
|
||||
from aiohttp.web_request import FileField
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.components import http, websocket_api
|
||||
from homeassistant.components.media_player.const import MEDIA_CLASS_DIRECTORY
|
||||
from homeassistant.components.media_player.errors import BrowseError
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
@ -32,6 +32,7 @@ def async_setup(hass: HomeAssistant) -> None:
|
||||
hass.data[DOMAIN][DOMAIN] = source
|
||||
hass.http.register_view(LocalMediaView(hass, source))
|
||||
hass.http.register_view(UploadMediaView(hass, source))
|
||||
websocket_api.async_register_command(hass, websocket_remove_media)
|
||||
|
||||
|
||||
class LocalSource(MediaSource):
|
||||
@ -190,7 +191,7 @@ class LocalSource(MediaSource):
|
||||
return media
|
||||
|
||||
|
||||
class LocalMediaView(HomeAssistantView):
|
||||
class LocalMediaView(http.HomeAssistantView):
|
||||
"""
|
||||
Local Media Finder View.
|
||||
|
||||
@ -231,7 +232,7 @@ class LocalMediaView(HomeAssistantView):
|
||||
return web.FileResponse(media_path)
|
||||
|
||||
|
||||
class UploadMediaView(HomeAssistantView):
|
||||
class UploadMediaView(http.HomeAssistantView):
|
||||
"""View to upload images."""
|
||||
|
||||
url = "/api/media_source/local_source/upload"
|
||||
@ -314,3 +315,52 @@ class UploadMediaView(HomeAssistantView):
|
||||
|
||||
with target_path.open("wb") as target_fp:
|
||||
shutil.copyfileobj(uploaded_file.file, target_fp)
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "media_source/local_source/remove",
|
||||
vol.Required("media_content_id"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.async_response
|
||||
async def websocket_remove_media(
|
||||
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
|
||||
) -> None:
|
||||
"""Remove media."""
|
||||
try:
|
||||
item = MediaSourceItem.from_uri(hass, msg["media_content_id"])
|
||||
except ValueError as err:
|
||||
connection.send_error(msg["id"], websocket_api.ERR_INVALID_FORMAT, str(err))
|
||||
return
|
||||
|
||||
source: LocalSource = hass.data[DOMAIN][DOMAIN]
|
||||
|
||||
try:
|
||||
source_dir_id, location = source.async_parse_identifier(item)
|
||||
except Unresolvable as err:
|
||||
connection.send_error(msg["id"], websocket_api.ERR_INVALID_FORMAT, str(err))
|
||||
return
|
||||
|
||||
item_path = source.async_full_path(source_dir_id, location)
|
||||
|
||||
def _do_delete() -> tuple[str, str] | None:
|
||||
if not item_path.exists():
|
||||
return websocket_api.ERR_NOT_FOUND, "Path does not exist"
|
||||
|
||||
if not item_path.is_file():
|
||||
return websocket_api.ERR_NOT_SUPPORTED, "Path is not a file"
|
||||
|
||||
item_path.unlink()
|
||||
return None
|
||||
|
||||
try:
|
||||
error = await hass.async_add_executor_job(_do_delete)
|
||||
except OSError as err:
|
||||
error = (websocket_api.ERR_UNKNOWN_ERROR, str(err))
|
||||
|
||||
if error:
|
||||
connection.send_error(msg["id"], *error)
|
||||
else:
|
||||
connection.send_result(msg["id"])
|
||||
|
@ -7,7 +7,7 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import media_source
|
||||
from homeassistant.components import media_source, websocket_api
|
||||
from homeassistant.components.media_source import const
|
||||
from homeassistant.config import async_process_ha_core_config
|
||||
from homeassistant.setup import async_setup_component
|
||||
@ -224,3 +224,110 @@ async def test_upload_view(hass, hass_client, temp_dir, hass_admin_user):
|
||||
|
||||
assert res.status == 401
|
||||
assert not (Path(temp_dir) / "no-admin-test.png").is_file()
|
||||
|
||||
|
||||
async def test_remove_file(hass, hass_ws_client, temp_dir, hass_admin_user):
|
||||
"""Allow uploading media."""
|
||||
|
||||
msg_count = 0
|
||||
file_count = 0
|
||||
|
||||
def msgid():
|
||||
nonlocal msg_count
|
||||
msg_count += 1
|
||||
return msg_count
|
||||
|
||||
def create_file():
|
||||
nonlocal file_count
|
||||
file_count += 1
|
||||
to_delete_path = Path(temp_dir) / f"to_delete_{file_count}.txt"
|
||||
to_delete_path.touch()
|
||||
return to_delete_path
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
to_delete = create_file()
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": msgid(),
|
||||
"type": "media_source/local_source/remove",
|
||||
"media_content_id": f"media-source://media_source/test_dir/{to_delete.name}",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
|
||||
assert not to_delete.exists()
|
||||
|
||||
# Test with bad media source ID
|
||||
extra_id_file = create_file()
|
||||
for bad_id, err in (
|
||||
# Not exists
|
||||
(
|
||||
"media-source://media_source/test_dir/not_exist.txt",
|
||||
websocket_api.ERR_NOT_FOUND,
|
||||
),
|
||||
# Only a dir
|
||||
("media-source://media_source/test_dir", websocket_api.ERR_NOT_SUPPORTED),
|
||||
# File with extra identifiers
|
||||
(
|
||||
f"media-source://media_source/test_dir/bla/../{extra_id_file.name}",
|
||||
websocket_api.ERR_INVALID_FORMAT,
|
||||
),
|
||||
# Location is invalid
|
||||
("media-source://media_source/test_dir/..", websocket_api.ERR_INVALID_FORMAT),
|
||||
# Domain != media_source
|
||||
("media-source://nest/test_dir/.", websocket_api.ERR_INVALID_FORMAT),
|
||||
# Completely something else
|
||||
("http://bla", websocket_api.ERR_INVALID_FORMAT),
|
||||
):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": msgid(),
|
||||
"type": "media_source/local_source/remove",
|
||||
"media_content_id": bad_id,
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == err
|
||||
|
||||
assert extra_id_file.exists()
|
||||
|
||||
# Test error deleting
|
||||
to_delete_2 = create_file()
|
||||
|
||||
with patch("pathlib.Path.unlink", side_effect=OSError):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": msgid(),
|
||||
"type": "media_source/local_source/remove",
|
||||
"media_content_id": f"media-source://media_source/test_dir/{to_delete_2.name}",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == websocket_api.ERR_UNKNOWN_ERROR
|
||||
|
||||
# Test requires admin access
|
||||
to_delete_3 = create_file()
|
||||
hass_admin_user.groups = []
|
||||
|
||||
await client.send_json(
|
||||
{
|
||||
"id": msgid(),
|
||||
"type": "media_source/local_source/remove",
|
||||
"media_content_id": f"media-source://media_source/test_dir/{to_delete_3.name}",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert to_delete_3.is_file()
|
||||
|
Loading…
x
Reference in New Issue
Block a user