Add post action to mastodon (#134788)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
Andrew Jackson 2025-01-31 11:29:23 +00:00 committed by GitHub
parent a7903d344f
commit 50f3d79fb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 607 additions and 41 deletions

View File

@ -2,11 +2,8 @@
from __future__ import annotations
from dataclasses import dataclass
from mastodon.Mastodon import Mastodon, MastodonError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_CLIENT_ID,
@ -16,27 +13,24 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import discovery
from homeassistant.helpers import config_validation as cv, discovery
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import slugify
from .const import CONF_BASE_URL, DOMAIN, LOGGER
from .coordinator import MastodonCoordinator
from .coordinator import MastodonConfigEntry, MastodonCoordinator, MastodonData
from .services import setup_services
from .utils import construct_mastodon_username, create_mastodon_client
PLATFORMS: list[Platform] = [Platform.NOTIFY, Platform.SENSOR]
@dataclass
class MastodonData:
"""Mastodon data type."""
client: Mastodon
instance: dict
account: dict
coordinator: MastodonCoordinator
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
type MastodonConfigEntry = ConfigEntry[MastodonData]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Mastodon component."""
setup_services(hass)
return True
async def async_setup_entry(hass: HomeAssistant, entry: MastodonConfigEntry) -> bool:

View File

@ -19,3 +19,10 @@ ACCOUNT_USERNAME: Final = "username"
ACCOUNT_FOLLOWERS_COUNT: Final = "followers_count"
ACCOUNT_FOLLOWING_COUNT: Final = "following_count"
ACCOUNT_STATUSES_COUNT: Final = "statuses_count"
ATTR_CONFIG_ENTRY_ID = "config_entry_id"
ATTR_STATUS = "status"
ATTR_VISIBILITY = "visibility"
ATTR_CONTENT_WARNING = "content_warning"
ATTR_MEDIA_WARNING = "media_warning"
ATTR_MEDIA = "media"

View File

@ -2,18 +2,33 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from typing import Any
from mastodon import Mastodon
from mastodon.Mastodon import MastodonError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import LOGGER
@dataclass
class MastodonData:
"""Mastodon data type."""
client: Mastodon
instance: dict
account: dict
coordinator: MastodonCoordinator
type MastodonConfigEntry = ConfigEntry[MastodonData]
class MastodonCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching Mastodon data."""

View File

@ -11,5 +11,10 @@
"default": "mdi:message-text"
}
}
},
"services": {
"post": {
"service": "mdi:message-text"
}
}
}

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import mimetypes
from typing import Any, cast
from mastodon import Mastodon
@ -16,15 +15,21 @@ from homeassistant.components.notify import (
)
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_CLIENT_ID, CONF_CLIENT_SECRET
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, issue_registry as ir
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import CONF_BASE_URL, DEFAULT_URL, LOGGER
from .const import (
ATTR_CONTENT_WARNING,
ATTR_MEDIA_WARNING,
CONF_BASE_URL,
DEFAULT_URL,
DOMAIN,
)
from .utils import get_media_type
ATTR_MEDIA = "media"
ATTR_TARGET = "target"
ATTR_MEDIA_WARNING = "media_warning"
ATTR_CONTENT_WARNING = "content_warning"
PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
{
@ -67,6 +72,17 @@ class MastodonNotificationService(BaseNotificationService):
def send_message(self, message: str = "", **kwargs: Any) -> None:
"""Toot a message, with media perhaps."""
ir.create_issue(
self.hass,
DOMAIN,
"deprecated_notify_action_mastodon",
breaks_in_ha_version="2025.9.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.WARNING,
translation_key="deprecated_notify_action",
)
target = None
if (target_list := kwargs.get(ATTR_TARGET)) is not None:
target = cast(list[str], target_list)[0]
@ -82,8 +98,11 @@ class MastodonNotificationService(BaseNotificationService):
media = data.get(ATTR_MEDIA)
if media:
if not self.hass.config.is_allowed_path(media):
LOGGER.warning("'%s' is not a whitelisted directory", media)
return
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="not_whitelisted_directory",
translation_placeholders={"media": media},
)
mediadata = self._upload_media(media)
sensitive = data.get(ATTR_MEDIA_WARNING)
@ -93,34 +112,39 @@ class MastodonNotificationService(BaseNotificationService):
try:
self.client.status_post(
message,
media_ids=mediadata["id"],
sensitive=sensitive,
visibility=target,
spoiler_text=content_warning,
media_ids=mediadata["id"],
sensitive=sensitive,
)
except MastodonAPIError:
LOGGER.error("Unable to send message")
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_send_message",
) from err
else:
try:
self.client.status_post(
message, visibility=target, spoiler_text=content_warning
)
except MastodonAPIError:
LOGGER.error("Unable to send message")
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_send_message",
) from err
def _upload_media(self, media_path: Any = None) -> Any:
"""Upload media."""
with open(media_path, "rb"):
media_type = self._media_type(media_path)
media_type = get_media_type(media_path)
try:
mediadata = self.client.media_post(media_path, mime_type=media_type)
except MastodonAPIError:
LOGGER.error("Unable to upload image %s", media_path)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_upload_image",
translation_placeholders={"media_path": media_path},
) from err
return mediadata
def _media_type(self, media_path: Any = None) -> Any:
"""Get media Type."""
(media_type, _) = mimetypes.guess_type(media_path)
return media_type

View File

@ -29,7 +29,7 @@ rules:
action-exceptions:
status: todo
comment: |
Legacy Notify needs rewriting once Notify architecture stabilizes.
Awaiting legacy Notify deprecation.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
@ -42,7 +42,7 @@ rules:
parallel-updates:
status: todo
comment: |
Does not set parallel-updates on notify platform.
Awaiting legacy Notify deprecation.
reauthentication-flow:
status: todo
comment: |
@ -50,7 +50,7 @@ rules:
test-coverage:
status: todo
comment: |
Legacy Notify needs rewriting once Notify architecture stabilizes.
Awaiting legacy Notify deprecation.
# Gold
devices: done
@ -78,7 +78,7 @@ rules:
entity-device-class: done
entity-disabled-by-default: done
entity-translations: done
exception-translations: todo
exception-translations: done
icon-translations: done
reconfiguration-flow:
status: todo

View File

@ -0,0 +1,142 @@
"""Define services for the Mastodon integration."""
from enum import StrEnum
from functools import partial
from typing import Any, cast
from mastodon import Mastodon
from mastodon.Mastodon import MastodonAPIError
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from . import MastodonConfigEntry
from .const import (
ATTR_CONFIG_ENTRY_ID,
ATTR_CONTENT_WARNING,
ATTR_MEDIA,
ATTR_MEDIA_WARNING,
ATTR_STATUS,
ATTR_VISIBILITY,
DOMAIN,
)
from .utils import get_media_type
class StatusVisibility(StrEnum):
"""StatusVisibility model."""
PUBLIC = "public"
UNLISTED = "unlisted"
PRIVATE = "private"
DIRECT = "direct"
SERVICE_POST = "post"
SERVICE_POST_SCHEMA = vol.Schema(
{
vol.Required(ATTR_CONFIG_ENTRY_ID): str,
vol.Required(ATTR_STATUS): str,
vol.Optional(ATTR_VISIBILITY): vol.In([x.lower() for x in StatusVisibility]),
vol.Optional(ATTR_CONTENT_WARNING): str,
vol.Optional(ATTR_MEDIA): str,
vol.Optional(ATTR_MEDIA_WARNING): bool,
}
)
def async_get_entry(hass: HomeAssistant, config_entry_id: str) -> MastodonConfigEntry:
"""Get the Mastodon config entry."""
if not (entry := hass.config_entries.async_get_entry(config_entry_id)):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="integration_not_found",
translation_placeholders={"target": DOMAIN},
)
if entry.state is not ConfigEntryState.LOADED:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="not_loaded",
translation_placeholders={"target": entry.title},
)
return cast(MastodonConfigEntry, entry)
def setup_services(hass: HomeAssistant) -> None:
"""Set up the services for the Mastodon integration."""
async def async_post(call: ServiceCall) -> ServiceResponse:
"""Post a status."""
entry = async_get_entry(hass, call.data[ATTR_CONFIG_ENTRY_ID])
client = entry.runtime_data.client
status = call.data[ATTR_STATUS]
visibility: str | None = (
StatusVisibility(call.data[ATTR_VISIBILITY])
if ATTR_VISIBILITY in call.data
else None
)
spoiler_text: str | None = call.data.get(ATTR_CONTENT_WARNING)
media_path: str | None = call.data.get(ATTR_MEDIA)
media_warning: str | None = call.data.get(ATTR_MEDIA_WARNING)
await hass.async_add_executor_job(
partial(
_post,
client=client,
status=status,
visibility=visibility,
spoiler_text=spoiler_text,
media_path=media_path,
sensitive=media_warning,
)
)
return None
def _post(client: Mastodon, **kwargs: Any) -> None:
"""Post to Mastodon."""
media_data: dict[str, Any] | None = None
media_path = kwargs.get("media_path")
if media_path:
if not hass.config.is_allowed_path(media_path):
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="not_whitelisted_directory",
translation_placeholders={"media": media_path},
)
media_type = get_media_type(media_path)
try:
media_data = client.media_post(
media_file=media_path, mime_type=media_type
)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_upload_image",
translation_placeholders={"media_path": media_path},
) from err
kwargs.pop("media_path", None)
try:
media_ids: str | None = None
if media_data:
media_ids = media_data["id"]
client.status_post(media_ids=media_ids, **kwargs)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_send_message",
) from err
hass.services.async_register(
DOMAIN, SERVICE_POST, async_post, schema=SERVICE_POST_SCHEMA
)

View File

@ -0,0 +1,30 @@
post:
fields:
config_entry_id:
required: true
selector:
config_entry:
integration: mastodon
status:
required: true
selector:
text:
visibility:
selector:
select:
options:
- public
- unlisted
- private
- direct
translation_key: post_visibility
content_warning:
selector:
text:
media:
selector:
text:
media_warning:
required: true
selector:
boolean:

View File

@ -25,6 +25,29 @@
"unknown": "Unknown error occured when connecting to the Mastodon instance."
}
},
"exceptions": {
"not_loaded": {
"message": "{target} is not loaded."
},
"integration_not_found": {
"message": "Integration \"{target}\" not found in registry."
},
"unable_to_send_message": {
"message": "Unable to send message."
},
"unable_to_upload_image": {
"message": "Unable to upload image {media_path}."
},
"not_whitelisted_directory": {
"message": "{media} is not a whitelisted directory."
}
},
"issues": {
"deprecated_notify_action": {
"title": "Deprecated Notify action used for Mastodon",
"description": "The Notify action for Mastodon is deprecated.\n\nUse the `mastodon.post` action instead."
}
},
"entity": {
"sensor": {
"followers": {
@ -40,5 +63,47 @@
"unit_of_measurement": "posts"
}
}
},
"services": {
"post": {
"name": "Post",
"description": "Posts a status on your Mastodon account.",
"fields": {
"config_entry_id": {
"name": "Mastodon account",
"description": "Select the Mastodon account to post to."
},
"status": {
"name": "Status",
"description": "The status to post."
},
"visibility": {
"name": "Visibility",
"description": "The visibility of the post (default: account setting)."
},
"content_warning": {
"name": "Content warning",
"description": "A content warning will be shown before the status text is shown (default: no content warning)."
},
"media": {
"name": "Media",
"description": "Attach an image or video to the post."
},
"media_warning": {
"name": "Media warning",
"description": "If an image or video is attached, will mark the media as sensitive (default: no media warning)."
}
}
}
},
"selector": {
"post_visibility": {
"options": {
"public": "Public - Visible to everyone",
"unlisted": "Unlisted - Public but not shown in public timelines",
"private": "Private - Followers only",
"direct": "Direct - Mentioned accounts only"
}
}
}
}

View File

@ -2,6 +2,9 @@
from __future__ import annotations
import mimetypes
from typing import Any
from mastodon import Mastodon
from .const import ACCOUNT_USERNAME, DEFAULT_NAME, INSTANCE_DOMAIN, INSTANCE_URI
@ -30,3 +33,11 @@ def construct_mastodon_username(
)
return DEFAULT_NAME
def get_media_type(media_path: Any = None) -> Any:
"""Get media type."""
(media_type, _) = mimetypes.guess_type(media_path)
return media_type

View File

@ -2,10 +2,13 @@
from unittest.mock import AsyncMock
from mastodon.Mastodon import MastodonAPIError
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from . import setup_integration
@ -36,3 +39,27 @@ async def test_notify(
)
assert mock_mastodon_client.status_post.assert_called_once
async def test_notify_failed(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the notify raising an error."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
mock_mastodon_client.status_post.side_effect = MastodonAPIError
with pytest.raises(HomeAssistantError, match="Unable to send message"):
await hass.services.async_call(
NOTIFY_DOMAIN,
"trwnh_mastodon_social",
{
"message": "test toot",
},
blocking=True,
return_response=False,
)

View File

@ -0,0 +1,246 @@
"""Tests for the Mastodon services."""
from unittest.mock import AsyncMock, Mock, patch
from mastodon.Mastodon import MastodonAPIError
import pytest
from homeassistant.components.mastodon.const import (
ATTR_CONFIG_ENTRY_ID,
ATTR_CONTENT_WARNING,
ATTR_MEDIA,
ATTR_STATUS,
ATTR_VISIBILITY,
DOMAIN,
)
from homeassistant.components.mastodon.services import SERVICE_POST
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from . import setup_integration
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
("payload", "kwargs"),
[
(
{
ATTR_STATUS: "test toot",
},
{
"status": "test toot",
"spoiler_text": None,
"visibility": None,
"media_ids": None,
"sensitive": None,
},
),
(
{ATTR_STATUS: "test toot", ATTR_VISIBILITY: "private"},
{
"status": "test toot",
"spoiler_text": None,
"visibility": "private",
"media_ids": None,
"sensitive": None,
},
),
(
{
ATTR_STATUS: "test toot",
ATTR_CONTENT_WARNING: "Spoiler",
ATTR_VISIBILITY: "private",
},
{
"status": "test toot",
"spoiler_text": "Spoiler",
"visibility": "private",
"media_ids": None,
"sensitive": None,
},
),
(
{
ATTR_STATUS: "test toot",
ATTR_CONTENT_WARNING: "Spoiler",
ATTR_MEDIA: "/image.jpg",
},
{
"status": "test toot",
"spoiler_text": "Spoiler",
"visibility": None,
"media_ids": "1",
"sensitive": None,
},
),
],
)
async def test_service_post(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
payload: dict[str, str],
kwargs: dict[str, str | None],
) -> None:
"""Test the post service."""
await setup_integration(hass, mock_config_entry)
with (
patch.object(hass.config, "is_allowed_path", return_value=True),
patch.object(mock_mastodon_client, "media_post", return_value={"id": "1"}),
):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{
ATTR_CONFIG_ENTRY_ID: mock_config_entry.entry_id,
}
| payload,
blocking=True,
return_response=False,
)
mock_mastodon_client.status_post.assert_called_with(**kwargs)
mock_mastodon_client.status_post.reset_mock()
@pytest.mark.parametrize(
("payload", "kwargs"),
[
(
{
ATTR_STATUS: "test toot",
},
{"status": "test toot", "spoiler_text": None, "visibility": None},
),
(
{
ATTR_STATUS: "test toot",
ATTR_CONTENT_WARNING: "Spoiler",
ATTR_MEDIA: "/image.jpg",
},
{
"status": "test toot",
"spoiler_text": "Spoiler",
"visibility": None,
"media_ids": "1",
"sensitive": None,
},
),
],
)
async def test_post_service_failed(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
payload: dict[str, str],
kwargs: dict[str, str | None],
) -> None:
"""Test the post service raising an error."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
hass.config.is_allowed_path = Mock(return_value=True)
mock_mastodon_client.media_post.return_value = {"id": "1"}
mock_mastodon_client.status_post.side_effect = MastodonAPIError
with pytest.raises(HomeAssistantError, match="Unable to send message"):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{ATTR_CONFIG_ENTRY_ID: mock_config_entry.entry_id} | payload,
blocking=True,
return_response=False,
)
async def test_post_media_upload_failed(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the post service raising an error because media upload fails."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
payload = {"status": "test toot", "media": "/fail.jpg"}
mock_mastodon_client.media_post.side_effect = MastodonAPIError
with (
patch.object(hass.config, "is_allowed_path", return_value=True),
pytest.raises(HomeAssistantError, match="Unable to upload image /fail.jpg"),
):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{ATTR_CONFIG_ENTRY_ID: mock_config_entry.entry_id} | payload,
blocking=True,
return_response=False,
)
async def test_post_path_not_whitelisted(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the post service raising an error because the file path is not whitelisted."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
payload = {"status": "test toot", "media": "/fail.jpg"}
with pytest.raises(
HomeAssistantError, match="/fail.jpg is not a whitelisted directory"
):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{ATTR_CONFIG_ENTRY_ID: mock_config_entry.entry_id} | payload,
blocking=True,
return_response=False,
)
async def test_service_entry_availability(
hass: HomeAssistant,
mock_mastodon_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the services without valid entry."""
mock_config_entry.add_to_hass(hass)
mock_config_entry2 = MockConfigEntry(domain=DOMAIN)
mock_config_entry2.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
payload = {"status": "test toot"}
with pytest.raises(ServiceValidationError, match="Mock Title is not loaded"):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{ATTR_CONFIG_ENTRY_ID: mock_config_entry2.entry_id} | payload,
blocking=True,
return_response=False,
)
with pytest.raises(
ServiceValidationError, match='Integration "mastodon" not found in registry'
):
await hass.services.async_call(
DOMAIN,
SERVICE_POST,
{ATTR_CONFIG_ENTRY_ID: "bad-config_id"} | payload,
blocking=True,
return_response=False,
)