Simplify mastodon service actions

This commit is contained in:
epenet 2025-06-12 06:50:39 +00:00
parent 8bf562b7b6
commit a67fe0e115
2 changed files with 74 additions and 70 deletions

View File

@ -19,7 +19,7 @@ from homeassistant.util import slugify
from .const import CONF_BASE_URL, DOMAIN, LOGGER
from .coordinator import MastodonConfigEntry, MastodonCoordinator, MastodonData
from .services import setup_services
from .services import async_setup_services
from .utils import construct_mastodon_username, create_mastodon_client
PLATFORMS: list[Platform] = [Platform.NOTIFY, Platform.SENSOR]
@ -29,7 +29,7 @@ CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Mastodon component."""
setup_services(hass)
async_setup_services(hass)
return True

View File

@ -9,7 +9,7 @@ from mastodon.Mastodon import MastodonAPIError, MediaAttachment
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse, callback
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from .const import (
@ -66,85 +66,89 @@ def async_get_entry(hass: HomeAssistant, config_entry_id: str) -> MastodonConfig
return cast(MastodonConfigEntry, entry)
def setup_services(hass: HomeAssistant) -> None:
"""Set up the services for the Mastodon integration."""
async def _async_post(call: ServiceCall) -> ServiceResponse:
"""Post a status."""
entry = async_get_entry(call.hass, call.data[ATTR_CONFIG_ENTRY_ID])
client = entry.runtime_data.client
async def async_post(call: ServiceCall) -> ServiceResponse:
"""Post a status."""
entry = async_get_entry(hass, call.data[ATTR_CONFIG_ENTRY_ID])
client = entry.runtime_data.client
status = call.data[ATTR_STATUS]
status = call.data[ATTR_STATUS]
visibility: str | None = (
StatusVisibility(call.data[ATTR_VISIBILITY])
if ATTR_VISIBILITY in call.data
else None
)
spoiler_text: str | None = call.data.get(ATTR_CONTENT_WARNING)
media_path: str | None = call.data.get(ATTR_MEDIA)
media_description: str | None = call.data.get(ATTR_MEDIA_DESCRIPTION)
media_warning: str | None = call.data.get(ATTR_MEDIA_WARNING)
visibility: str | None = (
StatusVisibility(call.data[ATTR_VISIBILITY])
if ATTR_VISIBILITY in call.data
else None
await call.hass.async_add_executor_job(
partial(
_post,
hass=call.hass,
client=client,
status=status,
visibility=visibility,
spoiler_text=spoiler_text,
media_path=media_path,
media_description=media_description,
sensitive=media_warning,
)
spoiler_text: str | None = call.data.get(ATTR_CONTENT_WARNING)
media_path: str | None = call.data.get(ATTR_MEDIA)
media_description: str | None = call.data.get(ATTR_MEDIA_DESCRIPTION)
media_warning: str | None = call.data.get(ATTR_MEDIA_WARNING)
)
await hass.async_add_executor_job(
partial(
_post,
client=client,
status=status,
visibility=visibility,
spoiler_text=spoiler_text,
media_path=media_path,
media_description=media_description,
sensitive=media_warning,
return None
def _post(hass: HomeAssistant, client: Mastodon, **kwargs: Any) -> None:
"""Post to Mastodon."""
media_data: MediaAttachment | None = None
media_path = kwargs.get("media_path")
if media_path:
if not hass.config.is_allowed_path(media_path):
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="not_whitelisted_directory",
translation_placeholders={"media": media_path},
)
)
return None
def _post(client: Mastodon, **kwargs: Any) -> None:
"""Post to Mastodon."""
media_data: MediaAttachment | None = None
media_path = kwargs.get("media_path")
if media_path:
if not hass.config.is_allowed_path(media_path):
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="not_whitelisted_directory",
translation_placeholders={"media": media_path},
)
media_type = get_media_type(media_path)
media_description = kwargs.get("media_description")
try:
media_data = client.media_post(
media_file=media_path,
mime_type=media_type,
description=media_description,
)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_upload_image",
translation_placeholders={"media_path": media_path},
) from err
kwargs.pop("media_path", None)
kwargs.pop("media_description", None)
media_type = get_media_type(media_path)
media_description = kwargs.get("media_description")
try:
media_ids: str | None = None
if media_data:
media_ids = media_data.id
client.status_post(media_ids=media_ids, **kwargs)
media_data = client.media_post(
media_file=media_path,
mime_type=media_type,
description=media_description,
)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_send_message",
translation_key="unable_to_upload_image",
translation_placeholders={"media_path": media_path},
) from err
kwargs.pop("media_path", None)
kwargs.pop("media_description", None)
try:
media_ids: str | None = None
if media_data:
media_ids = media_data.id
client.status_post(media_ids=media_ids, **kwargs)
except MastodonAPIError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unable_to_send_message",
) from err
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up the services for the Mastodon integration."""
hass.services.async_register(
DOMAIN, SERVICE_POST, async_post, schema=SERVICE_POST_SCHEMA
DOMAIN, SERVICE_POST, _async_post, schema=SERVICE_POST_SCHEMA
)