diff --git a/homeassistant/components/telegram_bot/__init__.py b/homeassistant/components/telegram_bot/__init__.py index 15e1f7d4f0e..fdf17023d39 100644 --- a/homeassistant/components/telegram_bot/__init__.py +++ b/homeassistant/components/telegram_bot/__init__.py @@ -2,137 +2,98 @@ from __future__ import annotations -import asyncio -import io -from ipaddress import ip_network +from ipaddress import IPv4Network, ip_network import logging +from types import ModuleType from typing import Any -import httpx -from telegram import ( - Bot, - CallbackQuery, - InlineKeyboardButton, - InlineKeyboardMarkup, - Message, - ReplyKeyboardMarkup, - ReplyKeyboardRemove, - Update, - User, -) -from telegram.constants import ParseMode -from telegram.error import TelegramError -from telegram.ext import CallbackContext, filters -from telegram.request import HTTPXRequest +from telegram import Bot +from telegram.error import InvalidToken import voluptuous as vol +from homeassistant.config_entries import SOURCE_IMPORT from homeassistant.const import ( - ATTR_COMMAND, ATTR_LATITUDE, ATTR_LONGITUDE, CONF_API_KEY, CONF_PLATFORM, + CONF_SOURCE, CONF_URL, - HTTP_BEARER_AUTHENTICATION, - HTTP_DIGEST_AUTHENTICATION, ) from homeassistant.core import ( - Context, HomeAssistant, ServiceCall, ServiceResponse, SupportsResponse, ) -from homeassistant.helpers import config_validation as cv, issue_registry as ir +from homeassistant.exceptions import ConfigEntryAuthFailed, ServiceValidationError +from homeassistant.helpers import config_validation as cv from homeassistant.helpers.typing import ConfigType -from homeassistant.loader import async_get_loaded_integration -from homeassistant.util.ssl import get_default_context, get_default_no_verify_context + +from . import broadcast, polling, webhooks +from .bot import TelegramBotConfigEntry, TelegramNotificationService, initialize_bot +from .const import ( + ATTR_ALLOWS_MULTIPLE_ANSWERS, + ATTR_AUTHENTICATION, + ATTR_CALLBACK_QUERY_ID, + ATTR_CAPTION, + ATTR_CHAT_ID, + ATTR_DISABLE_NOTIF, + ATTR_DISABLE_WEB_PREV, + ATTR_FILE, + ATTR_IS_ANONYMOUS, + ATTR_KEYBOARD, + ATTR_KEYBOARD_INLINE, + ATTR_MESSAGE, + ATTR_MESSAGE_TAG, + ATTR_MESSAGE_THREAD_ID, + ATTR_MESSAGEID, + ATTR_ONE_TIME_KEYBOARD, + ATTR_OPEN_PERIOD, + ATTR_OPTIONS, + ATTR_PARSER, + ATTR_PASSWORD, + ATTR_QUESTION, + ATTR_RESIZE_KEYBOARD, + ATTR_SHOW_ALERT, + ATTR_STICKER_ID, + ATTR_TARGET, + ATTR_TIMEOUT, + ATTR_TITLE, + ATTR_URL, + ATTR_USERNAME, + ATTR_VERIFY_SSL, + CONF_ALLOWED_CHAT_IDS, + CONF_BOT_COUNT, + CONF_CONFIG_ENTRY_ID, + CONF_PROXY_PARAMS, + CONF_PROXY_URL, + CONF_TRUSTED_NETWORKS, + DEFAULT_TRUSTED_NETWORKS, + DOMAIN, + PARSER_MD, + PLATFORM_BROADCAST, + PLATFORM_POLLING, + PLATFORM_WEBHOOKS, + SERVICE_ANSWER_CALLBACK_QUERY, + SERVICE_DELETE_MESSAGE, + SERVICE_EDIT_CAPTION, + SERVICE_EDIT_MESSAGE, + SERVICE_EDIT_REPLYMARKUP, + SERVICE_LEAVE_CHAT, + SERVICE_SEND_ANIMATION, + SERVICE_SEND_DOCUMENT, + SERVICE_SEND_LOCATION, + SERVICE_SEND_MESSAGE, + SERVICE_SEND_PHOTO, + SERVICE_SEND_POLL, + SERVICE_SEND_STICKER, + SERVICE_SEND_VIDEO, + SERVICE_SEND_VOICE, +) _LOGGER = logging.getLogger(__name__) -ATTR_DATA = "data" -ATTR_MESSAGE = "message" -ATTR_TITLE = "title" - -ATTR_ARGS = "args" -ATTR_AUTHENTICATION = "authentication" -ATTR_CALLBACK_QUERY = "callback_query" -ATTR_CALLBACK_QUERY_ID = "callback_query_id" -ATTR_CAPTION = "caption" -ATTR_CHAT_ID = "chat_id" -ATTR_CHAT_INSTANCE = "chat_instance" -ATTR_DATE = "date" -ATTR_DISABLE_NOTIF = "disable_notification" -ATTR_DISABLE_WEB_PREV = "disable_web_page_preview" -ATTR_EDITED_MSG = "edited_message" -ATTR_FILE = "file" -ATTR_FROM_FIRST = "from_first" -ATTR_FROM_LAST = "from_last" -ATTR_KEYBOARD = "keyboard" -ATTR_RESIZE_KEYBOARD = "resize_keyboard" -ATTR_ONE_TIME_KEYBOARD = "one_time_keyboard" -ATTR_KEYBOARD_INLINE = "inline_keyboard" -ATTR_MESSAGEID = "message_id" -ATTR_MSG = "message" -ATTR_MSGID = "id" -ATTR_PARSER = "parse_mode" -ATTR_PASSWORD = "password" -ATTR_REPLY_TO_MSGID = "reply_to_message_id" -ATTR_REPLYMARKUP = "reply_markup" -ATTR_SHOW_ALERT = "show_alert" -ATTR_STICKER_ID = "sticker_id" -ATTR_TARGET = "target" -ATTR_TEXT = "text" -ATTR_URL = "url" -ATTR_USER_ID = "user_id" -ATTR_USERNAME = "username" -ATTR_VERIFY_SSL = "verify_ssl" -ATTR_TIMEOUT = "timeout" -ATTR_MESSAGE_TAG = "message_tag" -ATTR_CHANNEL_POST = "channel_post" -ATTR_QUESTION = "question" -ATTR_OPTIONS = "options" -ATTR_ANSWERS = "answers" -ATTR_OPEN_PERIOD = "open_period" -ATTR_IS_ANONYMOUS = "is_anonymous" -ATTR_ALLOWS_MULTIPLE_ANSWERS = "allows_multiple_answers" -ATTR_MESSAGE_THREAD_ID = "message_thread_id" - -CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids" -CONF_PROXY_URL = "proxy_url" -CONF_PROXY_PARAMS = "proxy_params" -CONF_TRUSTED_NETWORKS = "trusted_networks" - -DOMAIN = "telegram_bot" - -SERVICE_SEND_MESSAGE = "send_message" -SERVICE_SEND_PHOTO = "send_photo" -SERVICE_SEND_STICKER = "send_sticker" -SERVICE_SEND_ANIMATION = "send_animation" -SERVICE_SEND_VIDEO = "send_video" -SERVICE_SEND_VOICE = "send_voice" -SERVICE_SEND_DOCUMENT = "send_document" -SERVICE_SEND_LOCATION = "send_location" -SERVICE_SEND_POLL = "send_poll" -SERVICE_EDIT_MESSAGE = "edit_message" -SERVICE_EDIT_CAPTION = "edit_caption" -SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup" -SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query" -SERVICE_DELETE_MESSAGE = "delete_message" -SERVICE_LEAVE_CHAT = "leave_chat" - -EVENT_TELEGRAM_CALLBACK = "telegram_callback" -EVENT_TELEGRAM_COMMAND = "telegram_command" -EVENT_TELEGRAM_TEXT = "telegram_text" -EVENT_TELEGRAM_SENT = "telegram_sent" - -PARSER_HTML = "html" -PARSER_MD = "markdown" -PARSER_MD2 = "markdownv2" -PARSER_PLAIN_TEXT = "plain_text" - -DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")] - CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.All( @@ -141,7 +102,7 @@ CONFIG_SCHEMA = vol.Schema( vol.Schema( { vol.Required(CONF_PLATFORM): vol.In( - ("broadcast", "polling", "webhooks") + (PLATFORM_BROADCAST, PLATFORM_POLLING, PLATFORM_WEBHOOKS) ), vol.Required(CONF_API_KEY): cv.string, vol.Required(CONF_ALLOWED_CHAT_IDS): vol.All( @@ -165,6 +126,7 @@ CONFIG_SCHEMA = vol.Schema( BASE_SERVICE_SCHEMA = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]), vol.Optional(ATTR_PARSER): cv.string, vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean, @@ -209,6 +171,7 @@ SERVICE_SCHEMA_SEND_LOCATION = BASE_SERVICE_SCHEMA.extend( SERVICE_SCHEMA_SEND_POLL = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]), vol.Required(ATTR_QUESTION): cv.string, vol.Required(ATTR_OPTIONS): vol.All(cv.ensure_list, [cv.string]), @@ -232,6 +195,7 @@ SERVICE_SCHEMA_EDIT_MESSAGE = SERVICE_SCHEMA_SEND_MESSAGE.extend( SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Required(ATTR_MESSAGEID): vol.Any( cv.positive_int, vol.All(cv.string, "last") ), @@ -244,6 +208,7 @@ SERVICE_SCHEMA_EDIT_CAPTION = vol.Schema( SERVICE_SCHEMA_EDIT_REPLYMARKUP = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Required(ATTR_MESSAGEID): vol.Any( cv.positive_int, vol.All(cv.string, "last") ), @@ -255,6 +220,7 @@ SERVICE_SCHEMA_EDIT_REPLYMARKUP = vol.Schema( SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Required(ATTR_MESSAGE): cv.string, vol.Required(ATTR_CALLBACK_QUERY_ID): vol.Coerce(int), vol.Optional(ATTR_SHOW_ALERT): cv.boolean, @@ -264,6 +230,7 @@ SERVICE_SCHEMA_ANSWER_CALLBACK_QUERY = vol.Schema( SERVICE_SCHEMA_DELETE_MESSAGE = vol.Schema( { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, vol.Required(ATTR_CHAT_ID): vol.Coerce(int), vol.Required(ATTR_MESSAGEID): vol.Any( cv.positive_int, vol.All(cv.string, "last") @@ -272,7 +239,12 @@ SERVICE_SCHEMA_DELETE_MESSAGE = vol.Schema( extra=vol.ALLOW_EXTRA, ) -SERVICE_SCHEMA_LEAVE_CHAT = vol.Schema({vol.Required(ATTR_CHAT_ID): vol.Coerce(int)}) +SERVICE_SCHEMA_LEAVE_CHAT = vol.Schema( + { + vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string, + vol.Required(ATTR_CHAT_ID): vol.Coerce(int), + } +) SERVICE_MAP = { SERVICE_SEND_MESSAGE: SERVICE_SCHEMA_SEND_MESSAGE, @@ -293,117 +265,42 @@ SERVICE_MAP = { } -def _read_file_as_bytesio(file_path: str) -> io.BytesIO: - """Read a file and return it as a BytesIO object.""" - with open(file_path, "rb") as file: - data = io.BytesIO(file.read()) - data.name = file_path - return data - - -async def load_data( - hass, - url=None, - filepath=None, - username=None, - password=None, - authentication=None, - num_retries=5, - verify_ssl=None, -): - """Load data into ByteIO/File container from a source.""" - try: - if url is not None: - # Load data from URL - params = {} - headers = {} - if authentication == HTTP_BEARER_AUTHENTICATION and password is not None: - headers = {"Authorization": f"Bearer {password}"} - elif username is not None and password is not None: - if authentication == HTTP_DIGEST_AUTHENTICATION: - params["auth"] = httpx.DigestAuth(username, password) - else: - params["auth"] = httpx.BasicAuth(username, password) - if verify_ssl is not None: - params["verify"] = verify_ssl - - retry_num = 0 - async with httpx.AsyncClient( - timeout=15, headers=headers, **params - ) as client: - while retry_num < num_retries: - req = await client.get(url) - if req.status_code != 200: - _LOGGER.warning( - "Status code %s (retry #%s) loading %s", - req.status_code, - retry_num + 1, - url, - ) - else: - data = io.BytesIO(req.content) - if data.read(): - data.seek(0) - data.name = url - return data - _LOGGER.warning( - "Empty data (retry #%s) in %s)", retry_num + 1, url - ) - retry_num += 1 - if retry_num < num_retries: - await asyncio.sleep( - 1 - ) # Add a sleep to allow other async operations to proceed - _LOGGER.warning( - "Can't load data in %s after %s retries", url, retry_num - ) - elif filepath is not None: - if hass.config.is_allowed_path(filepath): - return await hass.async_add_executor_job( - _read_file_as_bytesio, filepath - ) - - _LOGGER.warning("'%s' are not secure to load data from!", filepath) - else: - _LOGGER.warning("Can't load data. No data found in params!") - - except (OSError, TypeError) as error: - _LOGGER.error("Can't load data into ByteIO: %s", error) - - return None +MODULES: dict[str, ModuleType] = { + PLATFORM_BROADCAST: broadcast, + PLATFORM_POLLING: polling, + PLATFORM_WEBHOOKS: webhooks, +} async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the Telegram bot component.""" - domain_config: list[dict[str, Any]] = config[DOMAIN] - if not domain_config: - return False - - platforms = await async_get_loaded_integration(hass, DOMAIN).async_get_platforms( - {p_config[CONF_PLATFORM] for p_config in domain_config} - ) - - for p_config in domain_config: - # Each platform config gets its own bot - bot = await hass.async_add_executor_job(initialize_bot, hass, p_config) - p_type: str = p_config[CONF_PLATFORM] - - platform = platforms[p_type] - - _LOGGER.debug("Setting up %s.%s", DOMAIN, p_type) - try: - receiver_service = await platform.async_setup_platform(hass, bot, p_config) - if receiver_service is False: - _LOGGER.error("Failed to initialize Telegram bot %s", p_type) - return False - - except Exception: - _LOGGER.exception("Error setting up platform %s", p_type) - return False - - notify_service = TelegramNotificationService( - hass, bot, p_config.get(CONF_ALLOWED_CHAT_IDS), p_config.get(ATTR_PARSER) + # import the last YAML config since existing behavior only works with the last config + domain_config: list[dict[str, Any]] | None = config.get(DOMAIN) + if domain_config: + trusted_networks: list[IPv4Network] = domain_config[-1].get( + CONF_TRUSTED_NETWORKS, [] + ) + trusted_networks_str: list[str] = ( + [str(trusted_network) for trusted_network in trusted_networks] + if trusted_networks + else [] + ) + hass.async_create_task( + hass.config_entries.flow.async_init( + DOMAIN, + context={CONF_SOURCE: SOURCE_IMPORT}, + data={ + CONF_PLATFORM: domain_config[-1][CONF_PLATFORM], + CONF_API_KEY: domain_config[-1][CONF_API_KEY], + CONF_ALLOWED_CHAT_IDS: domain_config[-1][CONF_ALLOWED_CHAT_IDS], + ATTR_PARSER: domain_config[-1][ATTR_PARSER], + CONF_PROXY_URL: domain_config[-1].get(CONF_PROXY_URL), + CONF_URL: domain_config[-1].get(CONF_URL), + CONF_TRUSTED_NETWORKS: trusted_networks_str, + CONF_BOT_COUNT: len(domain_config), + }, + ) ) async def async_send_telegram_message(service: ServiceCall) -> ServiceResponse: @@ -413,6 +310,35 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: kwargs = dict(service.data) _LOGGER.debug("New telegram message %s: %s", msgtype, kwargs) + config_entry_id: str | None = service.data.get(CONF_CONFIG_ENTRY_ID) + config_entry: TelegramBotConfigEntry | None = None + if config_entry_id: + config_entry = hass.config_entries.async_get_known_entry(config_entry_id) + + else: + config_entries: list[TelegramBotConfigEntry] = ( + service.hass.config_entries.async_entries(DOMAIN) + ) + + if len(config_entries) == 1: + config_entry = config_entries[0] + + if len(config_entries) > 1: + raise ServiceValidationError( + "Multiple config entries found. Please specify the Telegram bot to use.", + translation_domain=DOMAIN, + translation_key="multiple_config_entry", + ) + + if not config_entry or not hasattr(config_entry, "runtime_data"): + raise ServiceValidationError( + "No config entries found or setup failed. Please set up the Telegram Bot first.", + translation_domain=DOMAIN, + translation_key="missing_config_entry", + ) + + notify_service = config_entry.runtime_data + messages = None if msgtype == SERVICE_SEND_MESSAGE: messages = await notify_service.send_message( @@ -485,710 +411,44 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True -def initialize_bot(hass: HomeAssistant, p_config: dict) -> Bot: - """Initialize telegram bot with proxy support.""" - api_key: str = p_config[CONF_API_KEY] - proxy_url: str | None = p_config.get(CONF_PROXY_URL) - proxy_params: dict | None = p_config.get(CONF_PROXY_PARAMS) +async def async_setup_entry(hass: HomeAssistant, entry: TelegramBotConfigEntry) -> bool: + """Create the Telegram bot from config entry.""" + bot: Bot = await hass.async_add_executor_job(initialize_bot, hass, entry.data) + try: + await bot.get_me() + except InvalidToken as err: + raise ConfigEntryAuthFailed("Invalid API token for Telegram Bot.") from err - if proxy_url is not None: - auth = None - if proxy_params is None: - # CONF_PROXY_PARAMS has been kept for backwards compatibility. - proxy_params = {} - elif "username" in proxy_params and "password" in proxy_params: - # Auth can actually be stuffed into the URL, but the docs have previously - # indicated to put them here. - auth = proxy_params.pop("username"), proxy_params.pop("password") - ir.create_issue( - hass, - DOMAIN, - "proxy_params_auth_deprecation", - breaks_in_ha_version="2024.10.0", - is_persistent=False, - is_fixable=False, - severity=ir.IssueSeverity.WARNING, - translation_placeholders={ - "proxy_params": CONF_PROXY_PARAMS, - "proxy_url": CONF_PROXY_URL, - "telegram_bot": "Telegram bot", - }, - translation_key="proxy_params_auth_deprecation", - learn_more_url="https://github.com/home-assistant/core/pull/112778", - ) - else: - ir.create_issue( - hass, - DOMAIN, - "proxy_params_deprecation", - breaks_in_ha_version="2024.10.0", - is_persistent=False, - is_fixable=False, - severity=ir.IssueSeverity.WARNING, - translation_placeholders={ - "proxy_params": CONF_PROXY_PARAMS, - "proxy_url": CONF_PROXY_URL, - "httpx": "httpx", - "telegram_bot": "Telegram bot", - }, - translation_key="proxy_params_deprecation", - learn_more_url="https://github.com/home-assistant/core/pull/112778", - ) - proxy = httpx.Proxy(proxy_url, auth=auth, **proxy_params) - request = HTTPXRequest(connection_pool_size=8, proxy=proxy) - else: - request = HTTPXRequest(connection_pool_size=8) - return Bot(token=api_key, request=request) + p_type: str = entry.data[CONF_PLATFORM] - -class TelegramNotificationService: - """Implement the notification services for the Telegram Bot domain.""" - - def __init__(self, hass, bot, allowed_chat_ids, parser): - """Initialize the service.""" - self.allowed_chat_ids = allowed_chat_ids - self._default_user = self.allowed_chat_ids[0] - self._last_message_id = dict.fromkeys(self.allowed_chat_ids) - self._parsers = { - PARSER_HTML: ParseMode.HTML, - PARSER_MD: ParseMode.MARKDOWN, - PARSER_MD2: ParseMode.MARKDOWN_V2, - PARSER_PLAIN_TEXT: None, - } - self._parse_mode = self._parsers.get(parser) - self.bot = bot - self.hass = hass - - def _get_msg_ids(self, msg_data, chat_id): - """Get the message id to edit. - - This can be one of (message_id, inline_message_id) from a msg dict, - returning a tuple. - **You can use 'last' as message_id** to edit - the message last sent in the chat_id. - """ - message_id = inline_message_id = None - if ATTR_MESSAGEID in msg_data: - message_id = msg_data[ATTR_MESSAGEID] - if ( - isinstance(message_id, str) - and (message_id == "last") - and (self._last_message_id[chat_id] is not None) - ): - message_id = self._last_message_id[chat_id] - else: - inline_message_id = msg_data["inline_message_id"] - return message_id, inline_message_id - - def _get_target_chat_ids(self, target): - """Validate chat_id targets or return default target (first). - - :param target: optional list of integers ([12234, -12345]) - :return list of chat_id targets (integers) - """ - if target is not None: - if isinstance(target, int): - target = [target] - chat_ids = [t for t in target if t in self.allowed_chat_ids] - if chat_ids: - return chat_ids - _LOGGER.warning( - "Disallowed targets: %s, using default: %s", target, self._default_user - ) - return [self._default_user] - - def _get_msg_kwargs(self, data): - """Get parameters in message data kwargs.""" - - def _make_row_inline_keyboard(row_keyboard): - """Make a list of InlineKeyboardButtons. - - It can accept: - - a list of tuples like: - `[(text_b1, data_callback_b1), - (text_b2, data_callback_b2), ...] - - a string like: `/cmd1, /cmd2, /cmd3` - - or a string like: `text_b1:/cmd1, text_b2:/cmd2` - - also supports urls instead of callback commands - """ - buttons = [] - if isinstance(row_keyboard, str): - for key in row_keyboard.split(","): - if ":/" in key: - # check if command or URL - if key.startswith("https://"): - label = key.split(",")[0] - url = key[len(label) + 1 :] - buttons.append(InlineKeyboardButton(label, url=url)) - else: - # commands like: 'Label:/cmd' become ('Label', '/cmd') - label = key.split(":/")[0] - command = key[len(label) + 1 :] - buttons.append( - InlineKeyboardButton(label, callback_data=command) - ) - else: - # commands like: '/cmd' become ('CMD', '/cmd') - label = key.strip()[1:].upper() - buttons.append(InlineKeyboardButton(label, callback_data=key)) - elif isinstance(row_keyboard, list): - for entry in row_keyboard: - text_btn, data_btn = entry - if data_btn.startswith("https://"): - buttons.append(InlineKeyboardButton(text_btn, url=data_btn)) - else: - buttons.append( - InlineKeyboardButton(text_btn, callback_data=data_btn) - ) - else: - raise TypeError(str(row_keyboard)) - return buttons - - # Defaults - params = { - ATTR_PARSER: self._parse_mode, - ATTR_DISABLE_NOTIF: False, - ATTR_DISABLE_WEB_PREV: None, - ATTR_REPLY_TO_MSGID: None, - ATTR_REPLYMARKUP: None, - ATTR_TIMEOUT: None, - ATTR_MESSAGE_TAG: None, - ATTR_MESSAGE_THREAD_ID: None, - } - if data is not None: - if ATTR_PARSER in data: - params[ATTR_PARSER] = self._parsers.get( - data[ATTR_PARSER], self._parse_mode - ) - if ATTR_TIMEOUT in data: - params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT] - if ATTR_DISABLE_NOTIF in data: - params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF] - if ATTR_DISABLE_WEB_PREV in data: - params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV] - if ATTR_REPLY_TO_MSGID in data: - params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID] - if ATTR_MESSAGE_TAG in data: - params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG] - if ATTR_MESSAGE_THREAD_ID in data: - params[ATTR_MESSAGE_THREAD_ID] = data[ATTR_MESSAGE_THREAD_ID] - # Keyboards: - if ATTR_KEYBOARD in data: - keys = data.get(ATTR_KEYBOARD) - keys = keys if isinstance(keys, list) else [keys] - if keys: - params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup( - [[key.strip() for key in row.split(",")] for row in keys], - resize_keyboard=data.get(ATTR_RESIZE_KEYBOARD, False), - one_time_keyboard=data.get(ATTR_ONE_TIME_KEYBOARD, False), - ) - else: - params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True) - - elif ATTR_KEYBOARD_INLINE in data: - keys = data.get(ATTR_KEYBOARD_INLINE) - keys = keys if isinstance(keys, list) else [keys] - params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup( - [_make_row_inline_keyboard(row) for row in keys] - ) - return params - - async def _send_msg( - self, func_send, msg_error, message_tag, *args_msg, context=None, **kwargs_msg - ): - """Send one message.""" - try: - out = await func_send(*args_msg, **kwargs_msg) - if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID): - chat_id = out.chat_id - message_id = out[ATTR_MESSAGEID] - self._last_message_id[chat_id] = message_id - _LOGGER.debug( - "Last message ID: %s (from chat_id %s)", - self._last_message_id, - chat_id, - ) - - event_data = { - ATTR_CHAT_ID: chat_id, - ATTR_MESSAGEID: message_id, - } - if message_tag is not None: - event_data[ATTR_MESSAGE_TAG] = message_tag - if kwargs_msg.get(ATTR_MESSAGE_THREAD_ID) is not None: - event_data[ATTR_MESSAGE_THREAD_ID] = kwargs_msg[ - ATTR_MESSAGE_THREAD_ID - ] - self.hass.bus.async_fire( - EVENT_TELEGRAM_SENT, event_data, context=context - ) - elif not isinstance(out, bool): - _LOGGER.warning( - "Update last message: out_type:%s, out=%s", type(out), out - ) - except TelegramError as exc: - _LOGGER.error( - "%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg - ) - return None - return out - - async def send_message(self, message="", target=None, context=None, **kwargs): - """Send a message to one or multiple pre-allowed chat IDs.""" - title = kwargs.get(ATTR_TITLE) - text = f"{title}\n{message}" if title else message - params = self._get_msg_kwargs(kwargs) - msg_ids = {} - for chat_id in self._get_target_chat_ids(target): - _LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params) - msg = await self._send_msg( - self.bot.send_message, - "Error sending message", - params[ATTR_MESSAGE_TAG], - chat_id, - text, - parse_mode=params[ATTR_PARSER], - disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - if msg is not None: - msg_ids[chat_id] = msg.id - return msg_ids - - async def delete_message(self, chat_id=None, context=None, **kwargs): - """Delete a previously sent message.""" - chat_id = self._get_target_chat_ids(chat_id)[0] - message_id, _ = self._get_msg_ids(kwargs, chat_id) - _LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id) - deleted = await self._send_msg( - self.bot.delete_message, - "Error deleting message", - None, - chat_id, - message_id, - context=context, - ) - # reduce message_id anyway: - if self._last_message_id[chat_id] is not None: - # change last msg_id for deque(n_msgs)? - self._last_message_id[chat_id] -= 1 - return deleted - - async def edit_message(self, type_edit, chat_id=None, context=None, **kwargs): - """Edit a previously sent message.""" - chat_id = self._get_target_chat_ids(chat_id)[0] - message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id) - params = self._get_msg_kwargs(kwargs) - _LOGGER.debug( - "Edit message %s in chat ID %s with params: %s", - message_id or inline_message_id, - chat_id, - params, - ) - if type_edit == SERVICE_EDIT_MESSAGE: - message = kwargs.get(ATTR_MESSAGE) - title = kwargs.get(ATTR_TITLE) - text = f"{title}\n{message}" if title else message - _LOGGER.debug("Editing message with ID %s", message_id or inline_message_id) - return await self._send_msg( - self.bot.edit_message_text, - "Error editing text message", - params[ATTR_MESSAGE_TAG], - text, - chat_id=chat_id, - message_id=message_id, - inline_message_id=inline_message_id, - parse_mode=params[ATTR_PARSER], - disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - context=context, - ) - if type_edit == SERVICE_EDIT_CAPTION: - return await self._send_msg( - self.bot.edit_message_caption, - "Error editing message attributes", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - message_id=message_id, - inline_message_id=inline_message_id, - caption=kwargs.get(ATTR_CAPTION), - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - parse_mode=params[ATTR_PARSER], - context=context, - ) - - return await self._send_msg( - self.bot.edit_message_reply_markup, - "Error editing message attributes", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - message_id=message_id, - inline_message_id=inline_message_id, - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - context=context, - ) - - async def answer_callback_query( - self, message, callback_query_id, show_alert=False, context=None, **kwargs - ): - """Answer a callback originated with a press in an inline keyboard.""" - params = self._get_msg_kwargs(kwargs) - _LOGGER.debug( - "Answer callback query with callback ID %s: %s, alert: %s", - callback_query_id, - message, - show_alert, - ) - await self._send_msg( - self.bot.answer_callback_query, - "Error sending answer callback query", - params[ATTR_MESSAGE_TAG], - callback_query_id, - text=message, - show_alert=show_alert, - read_timeout=params[ATTR_TIMEOUT], - context=context, - ) - - async def send_file( - self, file_type=SERVICE_SEND_PHOTO, target=None, context=None, **kwargs - ): - """Send a photo, sticker, video, or document.""" - params = self._get_msg_kwargs(kwargs) - file_content = await load_data( - self.hass, - url=kwargs.get(ATTR_URL), - filepath=kwargs.get(ATTR_FILE), - username=kwargs.get(ATTR_USERNAME), - password=kwargs.get(ATTR_PASSWORD), - authentication=kwargs.get(ATTR_AUTHENTICATION), - verify_ssl=( - get_default_context() - if kwargs.get(ATTR_VERIFY_SSL, False) - else get_default_no_verify_context() - ), - ) - - msg_ids = {} - if file_content: - for chat_id in self._get_target_chat_ids(target): - _LOGGER.debug("Sending file to chat ID %s", chat_id) - - if file_type == SERVICE_SEND_PHOTO: - msg = await self._send_msg( - self.bot.send_photo, - "Error sending photo", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - photo=file_content, - caption=kwargs.get(ATTR_CAPTION), - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - parse_mode=params[ATTR_PARSER], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - - elif file_type == SERVICE_SEND_STICKER: - msg = await self._send_msg( - self.bot.send_sticker, - "Error sending sticker", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - sticker=file_content, - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - - elif file_type == SERVICE_SEND_VIDEO: - msg = await self._send_msg( - self.bot.send_video, - "Error sending video", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - video=file_content, - caption=kwargs.get(ATTR_CAPTION), - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - parse_mode=params[ATTR_PARSER], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - elif file_type == SERVICE_SEND_DOCUMENT: - msg = await self._send_msg( - self.bot.send_document, - "Error sending document", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - document=file_content, - caption=kwargs.get(ATTR_CAPTION), - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - parse_mode=params[ATTR_PARSER], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - elif file_type == SERVICE_SEND_VOICE: - msg = await self._send_msg( - self.bot.send_voice, - "Error sending voice", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - voice=file_content, - caption=kwargs.get(ATTR_CAPTION), - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - elif file_type == SERVICE_SEND_ANIMATION: - msg = await self._send_msg( - self.bot.send_animation, - "Error sending animation", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - animation=file_content, - caption=kwargs.get(ATTR_CAPTION), - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - parse_mode=params[ATTR_PARSER], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - - msg_ids[chat_id] = msg.id - file_content.seek(0) - else: - _LOGGER.error("Can't send file with kwargs: %s", kwargs) - - return msg_ids - - async def send_sticker(self, target=None, context=None, **kwargs) -> dict: - """Send a sticker from a telegram sticker pack.""" - params = self._get_msg_kwargs(kwargs) - stickerid = kwargs.get(ATTR_STICKER_ID) - - msg_ids = {} - if stickerid: - for chat_id in self._get_target_chat_ids(target): - msg = await self._send_msg( - self.bot.send_sticker, - "Error sending sticker", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - sticker=stickerid, - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - reply_markup=params[ATTR_REPLYMARKUP], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - msg_ids[chat_id] = msg.id - return msg_ids - return await self.send_file(SERVICE_SEND_STICKER, target, **kwargs) - - async def send_location( - self, latitude, longitude, target=None, context=None, **kwargs - ): - """Send a location.""" - latitude = float(latitude) - longitude = float(longitude) - params = self._get_msg_kwargs(kwargs) - msg_ids = {} - for chat_id in self._get_target_chat_ids(target): - _LOGGER.debug( - "Send location %s/%s to chat ID %s", latitude, longitude, chat_id - ) - msg = await self._send_msg( - self.bot.send_location, - "Error sending location", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - latitude=latitude, - longitude=longitude, - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - msg_ids[chat_id] = msg.id - return msg_ids - - async def send_poll( - self, - question, - options, - is_anonymous, - allows_multiple_answers, - target=None, - context=None, - **kwargs, - ): - """Send a poll.""" - params = self._get_msg_kwargs(kwargs) - openperiod = kwargs.get(ATTR_OPEN_PERIOD) - msg_ids = {} - for chat_id in self._get_target_chat_ids(target): - _LOGGER.debug("Send poll '%s' to chat ID %s", question, chat_id) - msg = await self._send_msg( - self.bot.send_poll, - "Error sending poll", - params[ATTR_MESSAGE_TAG], - chat_id=chat_id, - question=question, - options=options, - is_anonymous=is_anonymous, - allows_multiple_answers=allows_multiple_answers, - open_period=openperiod, - disable_notification=params[ATTR_DISABLE_NOTIF], - reply_to_message_id=params[ATTR_REPLY_TO_MSGID], - read_timeout=params[ATTR_TIMEOUT], - message_thread_id=params[ATTR_MESSAGE_THREAD_ID], - context=context, - ) - msg_ids[chat_id] = msg.id - return msg_ids - - async def leave_chat(self, chat_id=None, context=None): - """Remove bot from chat.""" - chat_id = self._get_target_chat_ids(chat_id)[0] - _LOGGER.debug("Leave from chat ID %s", chat_id) - return await self._send_msg( - self.bot.leave_chat, "Error leaving chat", None, chat_id, context=context - ) - - -class BaseTelegramBotEntity: - """The base class for the telegram bot.""" - - def __init__(self, hass, config): - """Initialize the bot base class.""" - self.allowed_chat_ids = config[CONF_ALLOWED_CHAT_IDS] - self.hass = hass - - async def handle_update(self, update: Update, context: CallbackContext) -> bool: - """Handle updates from bot application set up by the respective platform.""" - _LOGGER.debug("Handling update %s", update) - if not self.authorize_update(update): - return False - - # establish event type: text, command or callback_query - if update.callback_query: - # NOTE: Check for callback query first since effective message will be populated with the message - # in .callback_query (python-telegram-bot docs are wrong) - event_type, event_data = self._get_callback_query_event_data( - update.callback_query - ) - elif update.effective_message: - event_type, event_data = self._get_message_event_data( - update.effective_message - ) - else: - _LOGGER.warning("Unhandled update: %s", update) - return True - - event_context = Context() - - _LOGGER.debug("Firing event %s: %s", event_type, event_data) - self.hass.bus.async_fire(event_type, event_data, context=event_context) - return True - - @staticmethod - def _get_command_event_data(command_text: str | None) -> dict[str, str | list]: - if not command_text or not command_text.startswith("/"): - return {} - command_parts = command_text.split() - command = command_parts[0] - args = command_parts[1:] - return {ATTR_COMMAND: command, ATTR_ARGS: args} - - def _get_message_event_data(self, message: Message) -> tuple[str, dict[str, Any]]: - event_data: dict[str, Any] = { - ATTR_MSGID: message.message_id, - ATTR_CHAT_ID: message.chat.id, - ATTR_DATE: message.date, - ATTR_MESSAGE_THREAD_ID: message.message_thread_id, - } - if filters.COMMAND.filter(message): - # This is a command message - set event type to command and split data into command and args - event_type = EVENT_TELEGRAM_COMMAND - event_data.update(self._get_command_event_data(message.text)) - else: - event_type = EVENT_TELEGRAM_TEXT - event_data[ATTR_TEXT] = message.text - - if message.from_user: - event_data.update(self._get_user_event_data(message.from_user)) - - return event_type, event_data - - def _get_user_event_data(self, user: User) -> dict[str, Any]: - return { - ATTR_USER_ID: user.id, - ATTR_FROM_FIRST: user.first_name, - ATTR_FROM_LAST: user.last_name, - } - - def _get_callback_query_event_data( - self, callback_query: CallbackQuery - ) -> tuple[str, dict[str, Any]]: - event_type = EVENT_TELEGRAM_CALLBACK - event_data: dict[str, Any] = { - ATTR_MSGID: callback_query.id, - ATTR_CHAT_INSTANCE: callback_query.chat_instance, - ATTR_DATA: callback_query.data, - ATTR_MSG: None, - ATTR_CHAT_ID: None, - } - if callback_query.message: - event_data[ATTR_MSG] = callback_query.message.to_dict() - event_data[ATTR_CHAT_ID] = callback_query.message.chat.id - - if callback_query.from_user: - event_data.update(self._get_user_event_data(callback_query.from_user)) - - # Split data into command and args if possible - event_data.update(self._get_command_event_data(callback_query.data)) - - return event_type, event_data - - def authorize_update(self, update: Update) -> bool: - """Make sure either user or chat is in allowed_chat_ids.""" - from_user = update.effective_user.id if update.effective_user else None - from_chat = update.effective_chat.id if update.effective_chat else None - if from_user in self.allowed_chat_ids or from_chat in self.allowed_chat_ids: - return True - _LOGGER.error( - ( - "Unauthorized update - neither user id %s nor chat id %s is in allowed" - " chats: %s" - ), - from_user, - from_chat, - self.allowed_chat_ids, - ) + _LOGGER.debug("Setting up %s.%s", DOMAIN, p_type) + try: + receiver_service = await MODULES[p_type].async_setup_platform(hass, bot, entry) + except Exception: + _LOGGER.exception("Error setting up Telegram bot %s", p_type) + await bot.shutdown() return False + + notify_service = TelegramNotificationService( + hass, receiver_service, bot, entry, entry.options[ATTR_PARSER] + ) + entry.runtime_data = notify_service + + entry.async_on_unload(entry.add_update_listener(update_listener)) + + return True + + +async def update_listener(hass: HomeAssistant, entry: TelegramBotConfigEntry) -> None: + """Handle options update.""" + await hass.config_entries.async_reload(entry.entry_id) + + +async def async_unload_entry( + hass: HomeAssistant, entry: TelegramBotConfigEntry +) -> bool: + """Unload Telegram app.""" + # broadcast platform has no app + if entry.runtime_data.app: + await entry.runtime_data.app.shutdown() + return True diff --git a/homeassistant/components/telegram_bot/bot.py b/homeassistant/components/telegram_bot/bot.py new file mode 100644 index 00000000000..f983d0551f7 --- /dev/null +++ b/homeassistant/components/telegram_bot/bot.py @@ -0,0 +1,924 @@ +"""Telegram bot classes and utilities.""" + +from abc import abstractmethod +import asyncio +import io +import logging +from types import MappingProxyType +from typing import Any + +import httpx +from telegram import ( + Bot, + CallbackQuery, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + ReplyKeyboardMarkup, + ReplyKeyboardRemove, + Update, + User, +) +from telegram.constants import ParseMode +from telegram.error import TelegramError +from telegram.ext import CallbackContext, filters +from telegram.request import HTTPXRequest + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + ATTR_COMMAND, + CONF_API_KEY, + HTTP_BEARER_AUTHENTICATION, + HTTP_DIGEST_AUTHENTICATION, +) +from homeassistant.core import Context, HomeAssistant +from homeassistant.exceptions import ServiceValidationError +from homeassistant.helpers import issue_registry as ir +from homeassistant.util.ssl import get_default_context, get_default_no_verify_context + +from .const import ( + ATTR_ARGS, + ATTR_AUTHENTICATION, + ATTR_CAPTION, + ATTR_CHAT_ID, + ATTR_CHAT_INSTANCE, + ATTR_DATA, + ATTR_DATE, + ATTR_DISABLE_NOTIF, + ATTR_DISABLE_WEB_PREV, + ATTR_FILE, + ATTR_FROM_FIRST, + ATTR_FROM_LAST, + ATTR_KEYBOARD, + ATTR_KEYBOARD_INLINE, + ATTR_MESSAGE, + ATTR_MESSAGE_TAG, + ATTR_MESSAGE_THREAD_ID, + ATTR_MESSAGEID, + ATTR_MSG, + ATTR_MSGID, + ATTR_ONE_TIME_KEYBOARD, + ATTR_OPEN_PERIOD, + ATTR_PARSER, + ATTR_PASSWORD, + ATTR_REPLY_TO_MSGID, + ATTR_REPLYMARKUP, + ATTR_RESIZE_KEYBOARD, + ATTR_STICKER_ID, + ATTR_TEXT, + ATTR_TIMEOUT, + ATTR_TITLE, + ATTR_URL, + ATTR_USER_ID, + ATTR_USERNAME, + ATTR_VERIFY_SSL, + CONF_CHAT_ID, + CONF_PROXY_PARAMS, + CONF_PROXY_URL, + DOMAIN, + EVENT_TELEGRAM_CALLBACK, + EVENT_TELEGRAM_COMMAND, + EVENT_TELEGRAM_SENT, + EVENT_TELEGRAM_TEXT, + PARSER_HTML, + PARSER_MD, + PARSER_MD2, + PARSER_PLAIN_TEXT, + SERVICE_EDIT_CAPTION, + SERVICE_EDIT_MESSAGE, + SERVICE_SEND_ANIMATION, + SERVICE_SEND_DOCUMENT, + SERVICE_SEND_PHOTO, + SERVICE_SEND_STICKER, + SERVICE_SEND_VIDEO, + SERVICE_SEND_VOICE, +) + +_LOGGER = logging.getLogger(__name__) + +type TelegramBotConfigEntry = ConfigEntry[TelegramNotificationService] + + +class BaseTelegramBot: + """The base class for the telegram bot.""" + + def __init__(self, hass: HomeAssistant, config: TelegramBotConfigEntry) -> None: + """Initialize the bot base class.""" + self.hass = hass + self.config = config + + @abstractmethod + async def shutdown(self) -> None: + """Shutdown the bot application.""" + + async def handle_update(self, update: Update, context: CallbackContext) -> bool: + """Handle updates from bot application set up by the respective platform.""" + _LOGGER.debug("Handling update %s", update) + if not self.authorize_update(update): + return False + + # establish event type: text, command or callback_query + if update.callback_query: + # NOTE: Check for callback query first since effective message will be populated with the message + # in .callback_query (python-telegram-bot docs are wrong) + event_type, event_data = self._get_callback_query_event_data( + update.callback_query + ) + elif update.effective_message: + event_type, event_data = self._get_message_event_data( + update.effective_message + ) + else: + _LOGGER.warning("Unhandled update: %s", update) + return True + + event_context = Context() + + _LOGGER.debug("Firing event %s: %s", event_type, event_data) + self.hass.bus.async_fire(event_type, event_data, context=event_context) + return True + + @staticmethod + def _get_command_event_data(command_text: str | None) -> dict[str, str | list]: + if not command_text or not command_text.startswith("/"): + return {} + command_parts = command_text.split() + command = command_parts[0] + args = command_parts[1:] + return {ATTR_COMMAND: command, ATTR_ARGS: args} + + def _get_message_event_data(self, message: Message) -> tuple[str, dict[str, Any]]: + event_data: dict[str, Any] = { + ATTR_MSGID: message.message_id, + ATTR_CHAT_ID: message.chat.id, + ATTR_DATE: message.date, + ATTR_MESSAGE_THREAD_ID: message.message_thread_id, + } + if filters.COMMAND.filter(message): + # This is a command message - set event type to command and split data into command and args + event_type = EVENT_TELEGRAM_COMMAND + event_data.update(self._get_command_event_data(message.text)) + else: + event_type = EVENT_TELEGRAM_TEXT + event_data[ATTR_TEXT] = message.text + + if message.from_user: + event_data.update(self._get_user_event_data(message.from_user)) + + return event_type, event_data + + def _get_user_event_data(self, user: User) -> dict[str, Any]: + return { + ATTR_USER_ID: user.id, + ATTR_FROM_FIRST: user.first_name, + ATTR_FROM_LAST: user.last_name, + } + + def _get_callback_query_event_data( + self, callback_query: CallbackQuery + ) -> tuple[str, dict[str, Any]]: + event_type = EVENT_TELEGRAM_CALLBACK + event_data: dict[str, Any] = { + ATTR_MSGID: callback_query.id, + ATTR_CHAT_INSTANCE: callback_query.chat_instance, + ATTR_DATA: callback_query.data, + ATTR_MSG: None, + ATTR_CHAT_ID: None, + } + if callback_query.message: + event_data[ATTR_MSG] = callback_query.message.to_dict() + event_data[ATTR_CHAT_ID] = callback_query.message.chat.id + + if callback_query.from_user: + event_data.update(self._get_user_event_data(callback_query.from_user)) + + # Split data into command and args if possible + event_data.update(self._get_command_event_data(callback_query.data)) + + return event_type, event_data + + def authorize_update(self, update: Update) -> bool: + """Make sure either user or chat is in allowed_chat_ids.""" + from_user = update.effective_user.id if update.effective_user else None + from_chat = update.effective_chat.id if update.effective_chat else None + allowed_chat_ids: list[int] = [ + subentry.data[CONF_CHAT_ID] for subentry in self.config.subentries.values() + ] + if from_user in allowed_chat_ids or from_chat in allowed_chat_ids: + return True + _LOGGER.error( + ( + "Unauthorized update - neither user id %s nor chat id %s is in allowed" + " chats: %s" + ), + from_user, + from_chat, + allowed_chat_ids, + ) + return False + + +class TelegramNotificationService: + """Implement the notification services for the Telegram Bot domain.""" + + def __init__( + self, + hass: HomeAssistant, + app: BaseTelegramBot, + bot: Bot, + config: TelegramBotConfigEntry, + parser: str, + ) -> None: + """Initialize the service.""" + self.app = app + self.config = config + self._parsers = { + PARSER_HTML: ParseMode.HTML, + PARSER_MD: ParseMode.MARKDOWN, + PARSER_MD2: ParseMode.MARKDOWN_V2, + PARSER_PLAIN_TEXT: None, + } + self._parse_mode = self._parsers.get(parser) + self.bot = bot + self.hass = hass + + def _get_allowed_chat_ids(self) -> list[int]: + allowed_chat_ids: list[int] = [ + subentry.data[CONF_CHAT_ID] for subentry in self.config.subentries.values() + ] + + if not allowed_chat_ids: + bot_name: str = self.config.title + raise ServiceValidationError( + "No allowed chat IDs found for bot", + translation_domain=DOMAIN, + translation_key="missing_allowed_chat_ids", + translation_placeholders={ + "bot_name": bot_name, + }, + ) + + return allowed_chat_ids + + def _get_last_message_id(self): + return dict.fromkeys(self._get_allowed_chat_ids()) + + def _get_msg_ids(self, msg_data, chat_id): + """Get the message id to edit. + + This can be one of (message_id, inline_message_id) from a msg dict, + returning a tuple. + **You can use 'last' as message_id** to edit + the message last sent in the chat_id. + """ + message_id = inline_message_id = None + if ATTR_MESSAGEID in msg_data: + message_id = msg_data[ATTR_MESSAGEID] + if ( + isinstance(message_id, str) + and (message_id == "last") + and (self._get_last_message_id()[chat_id] is not None) + ): + message_id = self._get_last_message_id()[chat_id] + else: + inline_message_id = msg_data["inline_message_id"] + return message_id, inline_message_id + + def _get_target_chat_ids(self, target): + """Validate chat_id targets or return default target (first). + + :param target: optional list of integers ([12234, -12345]) + :return list of chat_id targets (integers) + """ + allowed_chat_ids: list[int] = self._get_allowed_chat_ids() + default_user: int = allowed_chat_ids[0] + if target is not None: + if isinstance(target, int): + target = [target] + chat_ids = [t for t in target if t in allowed_chat_ids] + if chat_ids: + return chat_ids + _LOGGER.warning( + "Disallowed targets: %s, using default: %s", target, default_user + ) + return [default_user] + + def _get_msg_kwargs(self, data): + """Get parameters in message data kwargs.""" + + def _make_row_inline_keyboard(row_keyboard): + """Make a list of InlineKeyboardButtons. + + It can accept: + - a list of tuples like: + `[(text_b1, data_callback_b1), + (text_b2, data_callback_b2), ...] + - a string like: `/cmd1, /cmd2, /cmd3` + - or a string like: `text_b1:/cmd1, text_b2:/cmd2` + - also supports urls instead of callback commands + """ + buttons = [] + if isinstance(row_keyboard, str): + for key in row_keyboard.split(","): + if ":/" in key: + # check if command or URL + if key.startswith("https://"): + label = key.split(",")[0] + url = key[len(label) + 1 :] + buttons.append(InlineKeyboardButton(label, url=url)) + else: + # commands like: 'Label:/cmd' become ('Label', '/cmd') + label = key.split(":/")[0] + command = key[len(label) + 1 :] + buttons.append( + InlineKeyboardButton(label, callback_data=command) + ) + else: + # commands like: '/cmd' become ('CMD', '/cmd') + label = key.strip()[1:].upper() + buttons.append(InlineKeyboardButton(label, callback_data=key)) + elif isinstance(row_keyboard, list): + for entry in row_keyboard: + text_btn, data_btn = entry + if data_btn.startswith("https://"): + buttons.append(InlineKeyboardButton(text_btn, url=data_btn)) + else: + buttons.append( + InlineKeyboardButton(text_btn, callback_data=data_btn) + ) + else: + raise TypeError(str(row_keyboard)) + return buttons + + # Defaults + params = { + ATTR_PARSER: self._parse_mode, + ATTR_DISABLE_NOTIF: False, + ATTR_DISABLE_WEB_PREV: None, + ATTR_REPLY_TO_MSGID: None, + ATTR_REPLYMARKUP: None, + ATTR_TIMEOUT: None, + ATTR_MESSAGE_TAG: None, + ATTR_MESSAGE_THREAD_ID: None, + } + if data is not None: + if ATTR_PARSER in data: + params[ATTR_PARSER] = self._parsers.get( + data[ATTR_PARSER], self._parse_mode + ) + if ATTR_TIMEOUT in data: + params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT] + if ATTR_DISABLE_NOTIF in data: + params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF] + if ATTR_DISABLE_WEB_PREV in data: + params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV] + if ATTR_REPLY_TO_MSGID in data: + params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID] + if ATTR_MESSAGE_TAG in data: + params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG] + if ATTR_MESSAGE_THREAD_ID in data: + params[ATTR_MESSAGE_THREAD_ID] = data[ATTR_MESSAGE_THREAD_ID] + # Keyboards: + if ATTR_KEYBOARD in data: + keys = data.get(ATTR_KEYBOARD) + keys = keys if isinstance(keys, list) else [keys] + if keys: + params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup( + [[key.strip() for key in row.split(",")] for row in keys], + resize_keyboard=data.get(ATTR_RESIZE_KEYBOARD, False), + one_time_keyboard=data.get(ATTR_ONE_TIME_KEYBOARD, False), + ) + else: + params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True) + + elif ATTR_KEYBOARD_INLINE in data: + keys = data.get(ATTR_KEYBOARD_INLINE) + keys = keys if isinstance(keys, list) else [keys] + params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup( + [_make_row_inline_keyboard(row) for row in keys] + ) + return params + + async def _send_msg( + self, func_send, msg_error, message_tag, *args_msg, context=None, **kwargs_msg + ): + """Send one message.""" + try: + out = await func_send(*args_msg, **kwargs_msg) + if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID): + chat_id = out.chat_id + message_id = out[ATTR_MESSAGEID] + self._get_last_message_id()[chat_id] = message_id + _LOGGER.debug( + "Last message ID: %s (from chat_id %s)", + self._get_last_message_id(), + chat_id, + ) + + event_data = { + ATTR_CHAT_ID: chat_id, + ATTR_MESSAGEID: message_id, + } + if message_tag is not None: + event_data[ATTR_MESSAGE_TAG] = message_tag + if kwargs_msg.get(ATTR_MESSAGE_THREAD_ID) is not None: + event_data[ATTR_MESSAGE_THREAD_ID] = kwargs_msg[ + ATTR_MESSAGE_THREAD_ID + ] + self.hass.bus.async_fire( + EVENT_TELEGRAM_SENT, event_data, context=context + ) + elif not isinstance(out, bool): + _LOGGER.warning( + "Update last message: out_type:%s, out=%s", type(out), out + ) + except TelegramError as exc: + _LOGGER.error( + "%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg + ) + return None + return out + + async def send_message(self, message="", target=None, context=None, **kwargs): + """Send a message to one or multiple pre-allowed chat IDs.""" + title = kwargs.get(ATTR_TITLE) + text = f"{title}\n{message}" if title else message + params = self._get_msg_kwargs(kwargs) + msg_ids = {} + for chat_id in self._get_target_chat_ids(target): + _LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params) + msg = await self._send_msg( + self.bot.send_message, + "Error sending message", + params[ATTR_MESSAGE_TAG], + chat_id, + text, + parse_mode=params[ATTR_PARSER], + disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + if msg is not None: + msg_ids[chat_id] = msg.id + return msg_ids + + async def delete_message(self, chat_id=None, context=None, **kwargs): + """Delete a previously sent message.""" + chat_id = self._get_target_chat_ids(chat_id)[0] + message_id, _ = self._get_msg_ids(kwargs, chat_id) + _LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id) + deleted = await self._send_msg( + self.bot.delete_message, + "Error deleting message", + None, + chat_id, + message_id, + context=context, + ) + # reduce message_id anyway: + if self._get_last_message_id()[chat_id] is not None: + # change last msg_id for deque(n_msgs)? + self._get_last_message_id()[chat_id] -= 1 + return deleted + + async def edit_message(self, type_edit, chat_id=None, context=None, **kwargs): + """Edit a previously sent message.""" + chat_id = self._get_target_chat_ids(chat_id)[0] + message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id) + params = self._get_msg_kwargs(kwargs) + _LOGGER.debug( + "Edit message %s in chat ID %s with params: %s", + message_id or inline_message_id, + chat_id, + params, + ) + if type_edit == SERVICE_EDIT_MESSAGE: + message = kwargs.get(ATTR_MESSAGE) + title = kwargs.get(ATTR_TITLE) + text = f"{title}\n{message}" if title else message + _LOGGER.debug("Editing message with ID %s", message_id or inline_message_id) + return await self._send_msg( + self.bot.edit_message_text, + "Error editing text message", + params[ATTR_MESSAGE_TAG], + text, + chat_id=chat_id, + message_id=message_id, + inline_message_id=inline_message_id, + parse_mode=params[ATTR_PARSER], + disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + context=context, + ) + if type_edit == SERVICE_EDIT_CAPTION: + return await self._send_msg( + self.bot.edit_message_caption, + "Error editing message attributes", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + message_id=message_id, + inline_message_id=inline_message_id, + caption=kwargs.get(ATTR_CAPTION), + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + parse_mode=params[ATTR_PARSER], + context=context, + ) + + return await self._send_msg( + self.bot.edit_message_reply_markup, + "Error editing message attributes", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + message_id=message_id, + inline_message_id=inline_message_id, + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + context=context, + ) + + async def answer_callback_query( + self, message, callback_query_id, show_alert=False, context=None, **kwargs + ): + """Answer a callback originated with a press in an inline keyboard.""" + params = self._get_msg_kwargs(kwargs) + _LOGGER.debug( + "Answer callback query with callback ID %s: %s, alert: %s", + callback_query_id, + message, + show_alert, + ) + await self._send_msg( + self.bot.answer_callback_query, + "Error sending answer callback query", + params[ATTR_MESSAGE_TAG], + callback_query_id, + text=message, + show_alert=show_alert, + read_timeout=params[ATTR_TIMEOUT], + context=context, + ) + + async def send_file( + self, file_type=SERVICE_SEND_PHOTO, target=None, context=None, **kwargs + ): + """Send a photo, sticker, video, or document.""" + params = self._get_msg_kwargs(kwargs) + file_content = await load_data( + self.hass, + url=kwargs.get(ATTR_URL), + filepath=kwargs.get(ATTR_FILE), + username=kwargs.get(ATTR_USERNAME), + password=kwargs.get(ATTR_PASSWORD), + authentication=kwargs.get(ATTR_AUTHENTICATION), + verify_ssl=( + get_default_context() + if kwargs.get(ATTR_VERIFY_SSL, False) + else get_default_no_verify_context() + ), + ) + + msg_ids = {} + if file_content: + for chat_id in self._get_target_chat_ids(target): + _LOGGER.debug("Sending file to chat ID %s", chat_id) + + if file_type == SERVICE_SEND_PHOTO: + msg = await self._send_msg( + self.bot.send_photo, + "Error sending photo", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + photo=file_content, + caption=kwargs.get(ATTR_CAPTION), + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + parse_mode=params[ATTR_PARSER], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + + elif file_type == SERVICE_SEND_STICKER: + msg = await self._send_msg( + self.bot.send_sticker, + "Error sending sticker", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + sticker=file_content, + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + + elif file_type == SERVICE_SEND_VIDEO: + msg = await self._send_msg( + self.bot.send_video, + "Error sending video", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + video=file_content, + caption=kwargs.get(ATTR_CAPTION), + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + parse_mode=params[ATTR_PARSER], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + elif file_type == SERVICE_SEND_DOCUMENT: + msg = await self._send_msg( + self.bot.send_document, + "Error sending document", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + document=file_content, + caption=kwargs.get(ATTR_CAPTION), + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + parse_mode=params[ATTR_PARSER], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + elif file_type == SERVICE_SEND_VOICE: + msg = await self._send_msg( + self.bot.send_voice, + "Error sending voice", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + voice=file_content, + caption=kwargs.get(ATTR_CAPTION), + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + elif file_type == SERVICE_SEND_ANIMATION: + msg = await self._send_msg( + self.bot.send_animation, + "Error sending animation", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + animation=file_content, + caption=kwargs.get(ATTR_CAPTION), + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + parse_mode=params[ATTR_PARSER], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + + msg_ids[chat_id] = msg.id + file_content.seek(0) + else: + _LOGGER.error("Can't send file with kwargs: %s", kwargs) + + return msg_ids + + async def send_sticker(self, target=None, context=None, **kwargs) -> dict: + """Send a sticker from a telegram sticker pack.""" + params = self._get_msg_kwargs(kwargs) + stickerid = kwargs.get(ATTR_STICKER_ID) + + msg_ids = {} + if stickerid: + for chat_id in self._get_target_chat_ids(target): + msg = await self._send_msg( + self.bot.send_sticker, + "Error sending sticker", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + sticker=stickerid, + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + reply_markup=params[ATTR_REPLYMARKUP], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + msg_ids[chat_id] = msg.id + return msg_ids + return await self.send_file(SERVICE_SEND_STICKER, target, **kwargs) + + async def send_location( + self, latitude, longitude, target=None, context=None, **kwargs + ): + """Send a location.""" + latitude = float(latitude) + longitude = float(longitude) + params = self._get_msg_kwargs(kwargs) + msg_ids = {} + for chat_id in self._get_target_chat_ids(target): + _LOGGER.debug( + "Send location %s/%s to chat ID %s", latitude, longitude, chat_id + ) + msg = await self._send_msg( + self.bot.send_location, + "Error sending location", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + latitude=latitude, + longitude=longitude, + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + msg_ids[chat_id] = msg.id + return msg_ids + + async def send_poll( + self, + question, + options, + is_anonymous, + allows_multiple_answers, + target=None, + context=None, + **kwargs, + ): + """Send a poll.""" + params = self._get_msg_kwargs(kwargs) + openperiod = kwargs.get(ATTR_OPEN_PERIOD) + msg_ids = {} + for chat_id in self._get_target_chat_ids(target): + _LOGGER.debug("Send poll '%s' to chat ID %s", question, chat_id) + msg = await self._send_msg( + self.bot.send_poll, + "Error sending poll", + params[ATTR_MESSAGE_TAG], + chat_id=chat_id, + question=question, + options=options, + is_anonymous=is_anonymous, + allows_multiple_answers=allows_multiple_answers, + open_period=openperiod, + disable_notification=params[ATTR_DISABLE_NOTIF], + reply_to_message_id=params[ATTR_REPLY_TO_MSGID], + read_timeout=params[ATTR_TIMEOUT], + message_thread_id=params[ATTR_MESSAGE_THREAD_ID], + context=context, + ) + msg_ids[chat_id] = msg.id + return msg_ids + + async def leave_chat(self, chat_id=None, context=None, **kwargs): + """Remove bot from chat.""" + chat_id = self._get_target_chat_ids(chat_id)[0] + _LOGGER.debug("Leave from chat ID %s", chat_id) + return await self._send_msg( + self.bot.leave_chat, "Error leaving chat", None, chat_id, context=context + ) + + +def initialize_bot(hass: HomeAssistant, p_config: MappingProxyType[str, Any]) -> Bot: + """Initialize telegram bot with proxy support.""" + api_key: str = p_config[CONF_API_KEY] + proxy_url: str | None = p_config.get(CONF_PROXY_URL) + proxy_params: dict | None = p_config.get(CONF_PROXY_PARAMS) + + if proxy_url is not None: + auth = None + if proxy_params is None: + # CONF_PROXY_PARAMS has been kept for backwards compatibility. + proxy_params = {} + elif "username" in proxy_params and "password" in proxy_params: + # Auth can actually be stuffed into the URL, but the docs have previously + # indicated to put them here. + auth = proxy_params.pop("username"), proxy_params.pop("password") + ir.create_issue( + hass, + DOMAIN, + "proxy_params_auth_deprecation", + breaks_in_ha_version="2024.10.0", + is_persistent=False, + is_fixable=False, + severity=ir.IssueSeverity.WARNING, + translation_placeholders={ + "proxy_params": CONF_PROXY_PARAMS, + "proxy_url": CONF_PROXY_URL, + "telegram_bot": "Telegram bot", + }, + translation_key="proxy_params_auth_deprecation", + learn_more_url="https://github.com/home-assistant/core/pull/112778", + ) + else: + ir.create_issue( + hass, + DOMAIN, + "proxy_params_deprecation", + breaks_in_ha_version="2024.10.0", + is_persistent=False, + is_fixable=False, + severity=ir.IssueSeverity.WARNING, + translation_placeholders={ + "proxy_params": CONF_PROXY_PARAMS, + "proxy_url": CONF_PROXY_URL, + "httpx": "httpx", + "telegram_bot": "Telegram bot", + }, + translation_key="proxy_params_deprecation", + learn_more_url="https://github.com/home-assistant/core/pull/112778", + ) + proxy = httpx.Proxy(proxy_url, auth=auth, **proxy_params) + request = HTTPXRequest(connection_pool_size=8, proxy=proxy) + else: + request = HTTPXRequest(connection_pool_size=8) + return Bot(token=api_key, request=request) + + +async def load_data( + hass: HomeAssistant, + url=None, + filepath=None, + username=None, + password=None, + authentication=None, + num_retries=5, + verify_ssl=None, +): + """Load data into ByteIO/File container from a source.""" + try: + if url is not None: + # Load data from URL + params: dict[str, Any] = {} + headers = {} + if authentication == HTTP_BEARER_AUTHENTICATION and password is not None: + headers = {"Authorization": f"Bearer {password}"} + elif username is not None and password is not None: + if authentication == HTTP_DIGEST_AUTHENTICATION: + params["auth"] = httpx.DigestAuth(username, password) + else: + params["auth"] = httpx.BasicAuth(username, password) + if verify_ssl is not None: + params["verify"] = verify_ssl + + retry_num = 0 + async with httpx.AsyncClient( + timeout=15, headers=headers, **params + ) as client: + while retry_num < num_retries: + req = await client.get(url) + if req.status_code != 200: + _LOGGER.warning( + "Status code %s (retry #%s) loading %s", + req.status_code, + retry_num + 1, + url, + ) + else: + data = io.BytesIO(req.content) + if data.read(): + data.seek(0) + data.name = url + return data + _LOGGER.warning( + "Empty data (retry #%s) in %s)", retry_num + 1, url + ) + retry_num += 1 + if retry_num < num_retries: + await asyncio.sleep( + 1 + ) # Add a sleep to allow other async operations to proceed + _LOGGER.warning( + "Can't load data in %s after %s retries", url, retry_num + ) + elif filepath is not None: + if hass.config.is_allowed_path(filepath): + return await hass.async_add_executor_job( + _read_file_as_bytesio, filepath + ) + + _LOGGER.warning("'%s' are not secure to load data from!", filepath) + else: + _LOGGER.warning("Can't load data. No data found in params!") + + except (OSError, TypeError) as error: + _LOGGER.error("Can't load data into ByteIO: %s", error) + + return None + + +def _read_file_as_bytesio(file_path: str) -> io.BytesIO: + """Read a file and return it as a BytesIO object.""" + with open(file_path, "rb") as file: + data = io.BytesIO(file.read()) + data.name = file_path + return data diff --git a/homeassistant/components/telegram_bot/broadcast.py b/homeassistant/components/telegram_bot/broadcast.py index dff061da243..147423c4ce0 100644 --- a/homeassistant/components/telegram_bot/broadcast.py +++ b/homeassistant/components/telegram_bot/broadcast.py @@ -1,6 +1,14 @@ """Support for Telegram bot to send messages only.""" +from telegram import Bot -async def async_setup_platform(hass, bot, config): +from homeassistant.core import HomeAssistant + +from .bot import BaseTelegramBot, TelegramBotConfigEntry + + +async def async_setup_platform( + hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry +) -> type[BaseTelegramBot] | None: """Set up the Telegram broadcast platform.""" - return True + return None diff --git a/homeassistant/components/telegram_bot/config_flow.py b/homeassistant/components/telegram_bot/config_flow.py new file mode 100644 index 00000000000..5586b098757 --- /dev/null +++ b/homeassistant/components/telegram_bot/config_flow.py @@ -0,0 +1,620 @@ +"""Config flow for Telegram Bot.""" + +from collections.abc import Mapping +from ipaddress import AddressValueError, IPv4Network +import logging +from types import MappingProxyType +from typing import Any + +from telegram import Bot, ChatFullInfo +from telegram.error import BadRequest, InvalidToken, NetworkError +import voluptuous as vol + +from homeassistant.config_entries import ( + SOURCE_IMPORT, + SOURCE_RECONFIGURE, + ConfigFlow, + ConfigFlowResult, + ConfigSubentryData, + ConfigSubentryFlow, + OptionsFlow, + SubentryFlowResult, +) +from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL +from homeassistant.core import callback +from homeassistant.data_entry_flow import AbortFlow +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue +from homeassistant.helpers.network import NoURLAvailableError, get_url +from homeassistant.helpers.selector import ( + SelectSelector, + SelectSelectorConfig, + TextSelector, + TextSelectorConfig, + TextSelectorType, +) + +from . import initialize_bot +from .bot import TelegramBotConfigEntry +from .const import ( + ATTR_PARSER, + BOT_NAME, + CONF_ALLOWED_CHAT_IDS, + CONF_BOT_COUNT, + CONF_CHAT_ID, + CONF_PROXY_URL, + CONF_TRUSTED_NETWORKS, + DEFAULT_TRUSTED_NETWORKS, + DOMAIN, + ERROR_FIELD, + ERROR_MESSAGE, + ISSUE_DEPRECATED_YAML, + ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS, + ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR, + PARSER_HTML, + PARSER_MD, + PARSER_MD2, + PLATFORM_BROADCAST, + PLATFORM_POLLING, + PLATFORM_WEBHOOKS, + SUBENTRY_TYPE_ALLOWED_CHAT_IDS, +) + +_LOGGER = logging.getLogger(__name__) + +STEP_USER_DATA_SCHEMA: vol.Schema = vol.Schema( + { + vol.Required(CONF_PLATFORM): SelectSelector( + SelectSelectorConfig( + options=[ + PLATFORM_BROADCAST, + PLATFORM_POLLING, + PLATFORM_WEBHOOKS, + ], + translation_key="platforms", + ) + ), + vol.Required(CONF_API_KEY): TextSelector( + TextSelectorConfig( + type=TextSelectorType.PASSWORD, + autocomplete="current-password", + ) + ), + vol.Optional(CONF_PROXY_URL): TextSelector( + config=TextSelectorConfig(type=TextSelectorType.URL) + ), + } +) +STEP_RECONFIGURE_USER_DATA_SCHEMA: vol.Schema = vol.Schema( + { + vol.Required(CONF_PLATFORM): SelectSelector( + SelectSelectorConfig( + options=[ + PLATFORM_BROADCAST, + PLATFORM_POLLING, + PLATFORM_WEBHOOKS, + ], + translation_key="platforms", + ) + ), + vol.Optional(CONF_PROXY_URL): TextSelector( + config=TextSelectorConfig(type=TextSelectorType.URL) + ), + } +) +STEP_REAUTH_DATA_SCHEMA: vol.Schema = vol.Schema( + { + vol.Required(CONF_API_KEY): TextSelector( + TextSelectorConfig( + type=TextSelectorType.PASSWORD, + autocomplete="current-password", + ) + ) + } +) +STEP_WEBHOOKS_DATA_SCHEMA: vol.Schema = vol.Schema( + { + vol.Optional(CONF_URL): TextSelector( + config=TextSelectorConfig(type=TextSelectorType.URL) + ), + vol.Required(CONF_TRUSTED_NETWORKS): vol.Coerce(str), + } +) +OPTIONS_SCHEMA: vol.Schema = vol.Schema( + { + vol.Required( + ATTR_PARSER, + ): SelectSelector( + SelectSelectorConfig( + options=[PARSER_MD, PARSER_MD2, PARSER_HTML], + translation_key="parsers", + ) + ) + } +) + + +class OptionsFlowHandler(OptionsFlow): + """Options flow for webhooks.""" + + async def async_step_init( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Manage the options.""" + + if user_input is not None: + return self.async_create_entry(data=user_input) + + return self.async_show_form( + step_id="init", + data_schema=self.add_suggested_values_to_schema( + OPTIONS_SCHEMA, + self.config_entry.options, + ), + ) + + +class TelgramBotConfigFlow(ConfigFlow, domain=DOMAIN): + """Handle a config flow for Telegram.""" + + VERSION = 1 + + @staticmethod + @callback + def async_get_options_flow( + config_entry: TelegramBotConfigEntry, + ) -> OptionsFlowHandler: + """Create the options flow.""" + return OptionsFlowHandler() + + @classmethod + @callback + def async_get_supported_subentry_types( + cls, config_entry: TelegramBotConfigEntry + ) -> dict[str, type[ConfigSubentryFlow]]: + """Return subentries supported by this integration.""" + return {SUBENTRY_TYPE_ALLOWED_CHAT_IDS: AllowedChatIdsSubEntryFlowHandler} + + def __init__(self) -> None: + """Create instance of the config flow.""" + super().__init__() + self._bot: Bot | None = None + self._bot_name = "Unknown bot" + + # for passing data between steps + self._step_user_data: dict[str, Any] = {} + + # triggered by async_setup() from __init__.py + async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult: + """Handle import of config entry from configuration.yaml.""" + + telegram_bot: str = f"{import_data[CONF_PLATFORM]} Telegram bot" + bot_count: int = import_data[CONF_BOT_COUNT] + + import_data[CONF_TRUSTED_NETWORKS] = ",".join( + import_data[CONF_TRUSTED_NETWORKS] + ) + try: + config_flow_result: ConfigFlowResult = await self.async_step_user( + import_data + ) + except AbortFlow: + # this happens if the config entry is already imported + self._create_issue(ISSUE_DEPRECATED_YAML, telegram_bot, bot_count) + raise + else: + errors: dict[str, str] | None = config_flow_result.get("errors") + if errors: + error: str = errors.get("base", "unknown") + self._create_issue( + error, + telegram_bot, + bot_count, + config_flow_result["description_placeholders"], + ) + return self.async_abort(reason="import_failed") + + subentries: list[ConfigSubentryData] = [] + allowed_chat_ids: list[int] = import_data[CONF_ALLOWED_CHAT_IDS] + for chat_id in allowed_chat_ids: + chat_name: str = await _async_get_chat_name(self._bot, chat_id) + subentry: ConfigSubentryData = ConfigSubentryData( + data={CONF_CHAT_ID: chat_id}, + subentry_type=CONF_ALLOWED_CHAT_IDS, + title=chat_name, + unique_id=str(chat_id), + ) + subentries.append(subentry) + config_flow_result["subentries"] = subentries + + self._create_issue( + ISSUE_DEPRECATED_YAML, + telegram_bot, + bot_count, + config_flow_result["description_placeholders"], + ) + return config_flow_result + + def _create_issue( + self, + issue: str, + telegram_bot_type: str, + bot_count: int, + description_placeholders: Mapping[str, str] | None = None, + ) -> None: + translation_key: str = ( + ISSUE_DEPRECATED_YAML + if bot_count == 1 + else ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS + ) + if issue != ISSUE_DEPRECATED_YAML: + translation_key = ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR + + telegram_bot = ( + description_placeholders.get(BOT_NAME, telegram_bot_type) + if description_placeholders + else telegram_bot_type + ) + error_field = ( + description_placeholders.get(ERROR_FIELD, "Unknown error") + if description_placeholders + else "Unknown error" + ) + error_message = ( + description_placeholders.get(ERROR_MESSAGE, "Unknown error") + if description_placeholders + else "Unknown error" + ) + + async_create_issue( + self.hass, + DOMAIN, + ISSUE_DEPRECATED_YAML, + breaks_in_ha_version="2025.12.0", + is_fixable=False, + issue_domain=DOMAIN, + severity=IssueSeverity.WARNING, + translation_key=translation_key, + translation_placeholders={ + "domain": DOMAIN, + "integration_title": "Telegram Bot", + "telegram_bot": telegram_bot, + ERROR_FIELD: error_field, + ERROR_MESSAGE: error_message, + }, + learn_more_url="https://github.com/home-assistant/core/pull/144617", + ) + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle a flow to create a new config entry for a Telegram bot.""" + + if not user_input: + return self.async_show_form( + step_id="user", + data_schema=STEP_USER_DATA_SCHEMA, + ) + + # prevent duplicates + await self.async_set_unique_id(user_input[CONF_API_KEY]) + self._abort_if_unique_id_configured() + + # validate connection to Telegram API + errors: dict[str, str] = {} + description_placeholders: dict[str, str] = {} + bot_name = await self._validate_bot( + user_input, errors, description_placeholders + ) + + if errors: + return self.async_show_form( + step_id="user", + data_schema=self.add_suggested_values_to_schema( + STEP_USER_DATA_SCHEMA, user_input + ), + errors=errors, + description_placeholders=description_placeholders, + ) + + if user_input[CONF_PLATFORM] != PLATFORM_WEBHOOKS: + await self._shutdown_bot() + + return self.async_create_entry( + title=bot_name, + data={ + CONF_PLATFORM: user_input[CONF_PLATFORM], + CONF_API_KEY: user_input[CONF_API_KEY], + CONF_PROXY_URL: user_input.get(CONF_PROXY_URL), + }, + options={ + # this value may come from yaml import + ATTR_PARSER: user_input.get(ATTR_PARSER, PARSER_MD) + }, + description_placeholders=description_placeholders, + ) + + self._bot_name = bot_name + self._step_user_data.update(user_input) + + if self.source == SOURCE_IMPORT: + return await self.async_step_webhooks( + { + CONF_URL: user_input.get(CONF_URL), + CONF_TRUSTED_NETWORKS: user_input[CONF_TRUSTED_NETWORKS], + } + ) + return await self.async_step_webhooks() + + async def _shutdown_bot(self) -> None: + """Shutdown the bot if it exists.""" + if self._bot: + await self._bot.shutdown() + self._bot = None + + async def _validate_bot( + self, + user_input: dict[str, Any], + errors: dict[str, str], + placeholders: dict[str, str], + ) -> str: + try: + bot = await self.hass.async_add_executor_job( + initialize_bot, self.hass, MappingProxyType(user_input) + ) + self._bot = bot + + user = await bot.get_me() + except InvalidToken as err: + _LOGGER.warning("Invalid API token") + errors["base"] = "invalid_api_key" + placeholders[ERROR_FIELD] = "API key" + placeholders[ERROR_MESSAGE] = str(err) + return "Unknown bot" + except (ValueError, NetworkError) as err: + _LOGGER.warning("Invalid proxy") + errors["base"] = "invalid_proxy_url" + placeholders["proxy_url_error"] = str(err) + placeholders[ERROR_FIELD] = "proxy url" + placeholders[ERROR_MESSAGE] = str(err) + return "Unknown bot" + else: + return user.full_name + + async def async_step_webhooks( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Handle config flow for webhook Telegram bot.""" + + if not user_input: + if self.source == SOURCE_RECONFIGURE: + return self.async_show_form( + step_id="webhooks", + data_schema=self.add_suggested_values_to_schema( + STEP_WEBHOOKS_DATA_SCHEMA, + self._get_reconfigure_entry().data, + ), + ) + + return self.async_show_form( + step_id="webhooks", + data_schema=self.add_suggested_values_to_schema( + STEP_WEBHOOKS_DATA_SCHEMA, + { + CONF_TRUSTED_NETWORKS: ",".join( + [str(network) for network in DEFAULT_TRUSTED_NETWORKS] + ), + }, + ), + ) + + errors: dict[str, str] = {} + description_placeholders: dict[str, str] = {BOT_NAME: self._bot_name} + self._validate_webhooks(user_input, errors, description_placeholders) + if errors: + return self.async_show_form( + step_id="webhooks", + data_schema=self.add_suggested_values_to_schema( + STEP_WEBHOOKS_DATA_SCHEMA, + user_input, + ), + errors=errors, + description_placeholders=description_placeholders, + ) + + await self._shutdown_bot() + + if self.source == SOURCE_RECONFIGURE: + user_input.update(self._step_user_data) + return self.async_update_reload_and_abort( + self._get_reconfigure_entry(), + title=self._bot_name, + data_updates=user_input, + ) + + return self.async_create_entry( + title=self._bot_name, + data={ + CONF_PLATFORM: self._step_user_data[CONF_PLATFORM], + CONF_API_KEY: self._step_user_data[CONF_API_KEY], + CONF_PROXY_URL: self._step_user_data.get(CONF_PROXY_URL), + CONF_URL: user_input.get(CONF_URL), + CONF_TRUSTED_NETWORKS: user_input[CONF_TRUSTED_NETWORKS], + }, + options={ATTR_PARSER: self._step_user_data.get(ATTR_PARSER, PARSER_MD)}, + description_placeholders=description_placeholders, + ) + + def _validate_webhooks( + self, + user_input: dict[str, Any], + errors: dict[str, str], + description_placeholders: dict[str, str], + ) -> None: + # validate URL + if CONF_URL in user_input and not user_input[CONF_URL].startswith("https"): + errors["base"] = "invalid_url" + description_placeholders[ERROR_FIELD] = "URL" + description_placeholders[ERROR_MESSAGE] = "URL must start with https" + return + if CONF_URL not in user_input: + try: + get_url(self.hass, require_ssl=True, allow_internal=False) + except NoURLAvailableError: + errors["base"] = "no_url_available" + description_placeholders[ERROR_FIELD] = "URL" + description_placeholders[ERROR_MESSAGE] = ( + "URL is required since you have not configured an external URL in Home Assistant" + ) + return + + # validate trusted networks + csv_trusted_networks: list[str] = [] + formatted_trusted_networks: str = ( + user_input[CONF_TRUSTED_NETWORKS].lstrip("[").rstrip("]") + ) + for trusted_network in cv.ensure_list_csv(formatted_trusted_networks): + formatted_trusted_network: str = trusted_network.strip("'") + try: + IPv4Network(formatted_trusted_network) + except (AddressValueError, ValueError) as err: + errors["base"] = "invalid_trusted_networks" + description_placeholders[ERROR_FIELD] = "trusted networks" + description_placeholders[ERROR_MESSAGE] = str(err) + return + else: + csv_trusted_networks.append(formatted_trusted_network) + user_input[CONF_TRUSTED_NETWORKS] = csv_trusted_networks + + return + + async def async_step_reconfigure( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Reconfigure Telegram bot.""" + + api_key: str = self._get_reconfigure_entry().data[CONF_API_KEY] + await self.async_set_unique_id(api_key) + self._abort_if_unique_id_mismatch() + + if not user_input: + return self.async_show_form( + step_id="reconfigure", + data_schema=self.add_suggested_values_to_schema( + STEP_RECONFIGURE_USER_DATA_SCHEMA, + self._get_reconfigure_entry().data, + ), + ) + + errors: dict[str, str] = {} + description_placeholders: dict[str, str] = {} + + user_input[CONF_API_KEY] = api_key + bot_name = await self._validate_bot( + user_input, errors, description_placeholders + ) + self._bot_name = bot_name + + if errors: + return self.async_show_form( + step_id="reconfigure", + data_schema=self.add_suggested_values_to_schema( + STEP_RECONFIGURE_USER_DATA_SCHEMA, + user_input, + ), + errors=errors, + description_placeholders=description_placeholders, + ) + + if user_input[CONF_PLATFORM] != PLATFORM_WEBHOOKS: + await self._shutdown_bot() + + return self.async_update_reload_and_abort( + self._get_reconfigure_entry(), title=bot_name, data_updates=user_input + ) + + self._step_user_data.update(user_input) + return await self.async_step_webhooks() + + async def async_step_reauth( + self, entry_data: Mapping[str, Any] + ) -> ConfigFlowResult: + """Reauth step.""" + return await self.async_step_reauth_confirm() + + async def async_step_reauth_confirm( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Reauth confirm step.""" + if user_input is None: + return self.async_show_form( + step_id="reauth_confirm", + data_schema=self.add_suggested_values_to_schema( + STEP_REAUTH_DATA_SCHEMA, self._get_reauth_entry().data + ), + ) + + errors: dict[str, str] = {} + description_placeholders: dict[str, str] = {} + + bot_name = await self._validate_bot( + user_input, errors, description_placeholders + ) + await self._shutdown_bot() + + if errors: + return self.async_show_form( + step_id="reauth_confirm", + data_schema=self.add_suggested_values_to_schema( + STEP_REAUTH_DATA_SCHEMA, self._get_reauth_entry().data + ), + errors=errors, + description_placeholders=description_placeholders, + ) + + return self.async_update_reload_and_abort( + self._get_reauth_entry(), title=bot_name, data_updates=user_input + ) + + +class AllowedChatIdsSubEntryFlowHandler(ConfigSubentryFlow): + """Handle a subentry flow for creating chat ID.""" + + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> SubentryFlowResult: + """Create allowed chat ID.""" + + errors: dict[str, str] = {} + + if user_input is not None: + config_entry: TelegramBotConfigEntry = self._get_entry() + bot = config_entry.runtime_data.bot + + chat_id: int = user_input[CONF_CHAT_ID] + chat_name = await _async_get_chat_name(bot, chat_id) + if chat_name: + return self.async_create_entry( + title=chat_name, + data={CONF_CHAT_ID: chat_id}, + unique_id=str(chat_id), + ) + + errors["base"] = "chat_not_found" + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}), + errors=errors, + ) + + +async def _async_get_chat_name(bot: Bot | None, chat_id: int) -> str: + if not bot: + return str(chat_id) + + try: + chat_info: ChatFullInfo = await bot.get_chat(chat_id) + return chat_info.effective_name or str(chat_id) + except BadRequest: + return "" diff --git a/homeassistant/components/telegram_bot/const.py b/homeassistant/components/telegram_bot/const.py new file mode 100644 index 00000000000..ca79fc868cf --- /dev/null +++ b/homeassistant/components/telegram_bot/const.py @@ -0,0 +1,109 @@ +"""Constants for the Telegram Bot integration.""" + +from ipaddress import ip_network + +DOMAIN = "telegram_bot" + +PLATFORM_BROADCAST = "broadcast" +PLATFORM_POLLING = "polling" +PLATFORM_WEBHOOKS = "webhooks" + +SUBENTRY_TYPE_ALLOWED_CHAT_IDS = "allowed_chat_ids" + +CONF_BOT_COUNT = "bot_count" +CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids" +CONF_CONFIG_ENTRY_ID = "config_entry_id" +CONF_PROXY_PARAMS = "proxy_params" + + +CONF_PROXY_URL = "proxy_url" +CONF_TRUSTED_NETWORKS = "trusted_networks" + +# subentry +CONF_CHAT_ID = "chat_id" + +BOT_NAME = "telegram_bot" +ERROR_FIELD = "error_field" +ERROR_MESSAGE = "error_message" + +ISSUE_DEPRECATED_YAML = "deprecated_yaml" +ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS = ( + "deprecated_yaml_import_issue_has_more_platforms" +) +ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR = "deprecated_yaml_import_issue_error" + +DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")] + +SERVICE_SEND_MESSAGE = "send_message" +SERVICE_SEND_PHOTO = "send_photo" +SERVICE_SEND_STICKER = "send_sticker" +SERVICE_SEND_ANIMATION = "send_animation" +SERVICE_SEND_VIDEO = "send_video" +SERVICE_SEND_VOICE = "send_voice" +SERVICE_SEND_DOCUMENT = "send_document" +SERVICE_SEND_LOCATION = "send_location" +SERVICE_SEND_POLL = "send_poll" +SERVICE_EDIT_MESSAGE = "edit_message" +SERVICE_EDIT_CAPTION = "edit_caption" +SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup" +SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query" +SERVICE_DELETE_MESSAGE = "delete_message" +SERVICE_LEAVE_CHAT = "leave_chat" + +EVENT_TELEGRAM_CALLBACK = "telegram_callback" +EVENT_TELEGRAM_COMMAND = "telegram_command" +EVENT_TELEGRAM_TEXT = "telegram_text" +EVENT_TELEGRAM_SENT = "telegram_sent" + +PARSER_HTML = "html" +PARSER_MD = "markdown" +PARSER_MD2 = "markdownv2" +PARSER_PLAIN_TEXT = "plain_text" + +ATTR_DATA = "data" +ATTR_MESSAGE = "message" +ATTR_TITLE = "title" + +ATTR_ARGS = "args" +ATTR_AUTHENTICATION = "authentication" +ATTR_CALLBACK_QUERY = "callback_query" +ATTR_CALLBACK_QUERY_ID = "callback_query_id" +ATTR_CAPTION = "caption" +ATTR_CHAT_ID = "chat_id" +ATTR_CHAT_INSTANCE = "chat_instance" +ATTR_DATE = "date" +ATTR_DISABLE_NOTIF = "disable_notification" +ATTR_DISABLE_WEB_PREV = "disable_web_page_preview" +ATTR_EDITED_MSG = "edited_message" +ATTR_FILE = "file" +ATTR_FROM_FIRST = "from_first" +ATTR_FROM_LAST = "from_last" +ATTR_KEYBOARD = "keyboard" +ATTR_RESIZE_KEYBOARD = "resize_keyboard" +ATTR_ONE_TIME_KEYBOARD = "one_time_keyboard" +ATTR_KEYBOARD_INLINE = "inline_keyboard" +ATTR_MESSAGEID = "message_id" +ATTR_MSG = "message" +ATTR_MSGID = "id" +ATTR_PARSER = "parse_mode" +ATTR_PASSWORD = "password" +ATTR_REPLY_TO_MSGID = "reply_to_message_id" +ATTR_REPLYMARKUP = "reply_markup" +ATTR_SHOW_ALERT = "show_alert" +ATTR_STICKER_ID = "sticker_id" +ATTR_TARGET = "target" +ATTR_TEXT = "text" +ATTR_URL = "url" +ATTR_USER_ID = "user_id" +ATTR_USERNAME = "username" +ATTR_VERIFY_SSL = "verify_ssl" +ATTR_TIMEOUT = "timeout" +ATTR_MESSAGE_TAG = "message_tag" +ATTR_CHANNEL_POST = "channel_post" +ATTR_QUESTION = "question" +ATTR_OPTIONS = "options" +ATTR_ANSWERS = "answers" +ATTR_OPEN_PERIOD = "open_period" +ATTR_IS_ANONYMOUS = "is_anonymous" +ATTR_ALLOWS_MULTIPLE_ANSWERS = "allows_multiple_answers" +ATTR_MESSAGE_THREAD_ID = "message_thread_id" diff --git a/homeassistant/components/telegram_bot/manifest.json b/homeassistant/components/telegram_bot/manifest.json index 3474d39b1d6..b0be5583192 100644 --- a/homeassistant/components/telegram_bot/manifest.json +++ b/homeassistant/components/telegram_bot/manifest.json @@ -2,6 +2,7 @@ "domain": "telegram_bot", "name": "Telegram bot", "codeowners": [], + "config_flow": true, "dependencies": ["http"], "documentation": "https://www.home-assistant.io/integrations/telegram_bot", "iot_class": "cloud_push", diff --git a/homeassistant/components/telegram_bot/polling.py b/homeassistant/components/telegram_bot/polling.py index bee7f752f6c..f6435c16d82 100644 --- a/homeassistant/components/telegram_bot/polling.py +++ b/homeassistant/components/telegram_bot/polling.py @@ -2,34 +2,35 @@ import logging -from telegram import Update +from telegram import Bot, Update from telegram.error import NetworkError, RetryAfter, TelegramError, TimedOut from telegram.ext import ApplicationBuilder, CallbackContext, TypeHandler -from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP +from homeassistant.core import HomeAssistant -from . import BaseTelegramBotEntity +from .bot import BaseTelegramBot, TelegramBotConfigEntry _LOGGER = logging.getLogger(__name__) -async def async_setup_platform(hass, bot, config): +async def async_setup_platform( + hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry +) -> BaseTelegramBot | None: """Set up the Telegram polling platform.""" pollbot = PollBot(hass, bot, config) - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, pollbot.start_polling) - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pollbot.stop_polling) + config.async_create_task(hass, pollbot.start_polling(), "polling telegram bot") - return True + return pollbot -async def process_error(update: Update, context: CallbackContext) -> None: +async def process_error(update: object, context: CallbackContext) -> None: """Telegram bot error handler.""" if context.error: error_callback(context.error, update) -def error_callback(error: Exception, update: Update | None = None) -> None: +def error_callback(error: Exception, update: object | None = None) -> None: """Log the error.""" try: raise error @@ -43,13 +44,15 @@ def error_callback(error: Exception, update: Update | None = None) -> None: _LOGGER.error("%s: %s", error.__class__.__name__, error) -class PollBot(BaseTelegramBotEntity): +class PollBot(BaseTelegramBot): """Controls the Application object that holds the bot and an updater. The application is set up to pass telegram updates to `self.handle_update` """ - def __init__(self, hass, bot, config): + def __init__( + self, hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry + ) -> None: """Create Application to poll for updates.""" super().__init__(hass, config) self.bot = bot @@ -57,6 +60,10 @@ class PollBot(BaseTelegramBotEntity): self.application.add_handler(TypeHandler(Update, self.handle_update)) self.application.add_error_handler(process_error) + async def shutdown(self) -> None: + """Shutdown the app.""" + await self.stop_polling() + async def start_polling(self, event=None): """Start the polling task.""" _LOGGER.debug("Starting polling") diff --git a/homeassistant/components/telegram_bot/services.yaml b/homeassistant/components/telegram_bot/services.yaml index a09f4d8f79b..581e7f2e350 100644 --- a/homeassistant/components/telegram_bot/services.yaml +++ b/homeassistant/components/telegram_bot/services.yaml @@ -2,6 +2,10 @@ send_message: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message: required: true example: The garage door has been open for 10 minutes. @@ -61,6 +65,10 @@ send_message: send_photo: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/image.png" selector: @@ -137,6 +145,10 @@ send_photo: send_sticker: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/sticker.webp" selector: @@ -205,6 +217,10 @@ send_sticker: send_animation: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/animation.gif" selector: @@ -281,6 +297,10 @@ send_animation: send_video: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/video.mp4" selector: @@ -357,6 +377,10 @@ send_video: send_voice: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/voice.opus" selector: @@ -425,6 +449,10 @@ send_voice: send_document: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot url: example: "http://example.org/path/to/the/document.odf" selector: @@ -501,6 +529,10 @@ send_document: send_location: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot latitude: required: true selector: @@ -555,6 +587,10 @@ send_location: send_poll: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot target: example: "[12345, 67890] or 12345" selector: @@ -603,6 +639,10 @@ send_poll: edit_message: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message_id: required: true example: "{{ trigger.event.data.message.message_id }}" @@ -641,6 +681,10 @@ edit_message: edit_caption: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message_id: required: true example: "{{ trigger.event.data.message.message_id }}" @@ -665,6 +709,10 @@ edit_caption: edit_replymarkup: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message_id: required: true example: "{{ trigger.event.data.message.message_id }}" @@ -685,6 +733,10 @@ edit_replymarkup: answer_callback_query: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message: required: true example: "OK, I'm listening" @@ -708,6 +760,10 @@ answer_callback_query: delete_message: fields: + config_entry_id: + selector: + config_entry: + integration: telegram_bot message_id: required: true example: "{{ trigger.event.data.message.message_id }}" diff --git a/homeassistant/components/telegram_bot/strings.json b/homeassistant/components/telegram_bot/strings.json index 8f4894f42a7..1fb0ea30475 100644 --- a/homeassistant/components/telegram_bot/strings.json +++ b/homeassistant/components/telegram_bot/strings.json @@ -1,9 +1,128 @@ { + "config": { + "step": { + "user": { + "title": "Telegram bot setup", + "description": "Create a new Telegram bot", + "data": { + "platform": "Platform", + "api_key": "[%key:common::config_flow::data::api_key%]", + "proxy_url": "Proxy URL" + }, + "data_description": { + "platform": "Telegram bot implementation", + "api_key": "The API token of your bot.", + "proxy_url": "Proxy URL if working behind one, optionally including username and password.\n(socks5://username:password@proxy_ip:proxy_port)" + } + }, + "webhooks": { + "title": "Webhooks network configuration", + "data": { + "url": "[%key:common::config_flow::data::url%]", + "trusted_networks": "Trusted networks" + }, + "data_description": { + "url": "Allow to overwrite the external URL from the Home Assistant configuration for different setups.", + "trusted_networks": "Telegram server access ACL as list.\nDefault: 149.154.160.0/20, 91.108.4.0/22" + } + }, + "reconfigure": { + "title": "Telegram bot setup", + "description": "Reconfigure Telegram bot", + "data": { + "platform": "[%key:component::telegram_bot::config::step::user::data::platform%]", + "proxy_url": "[%key:component::telegram_bot::config::step::user::data::proxy_url%]" + }, + "data_description": { + "platform": "[%key:component::telegram_bot::config::step::user::data_description::platform%]", + "proxy_url": "[%key:component::telegram_bot::config::step::user::data_description::proxy_url%]" + } + }, + "reauth_confirm": { + "title": "Re-authenticate Telegram bot", + "data": { + "api_key": "[%key:common::config_flow::data::api_key%]" + }, + "data_description": { + "api_key": "[%key:component::telegram_bot::config::step::user::data_description::api_key%]" + } + } + }, + "error": { + "invalid_api_key": "[%key:common::config_flow::error::invalid_api_key%]", + "invalid_proxy_url": "{proxy_url_error}", + "no_url_available": "URL is required since you have not configured an external URL in Home Assistant", + "invalid_url": "URL must start with https", + "invalid_trusted_networks": "Invalid trusted network: {error_message}" + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_service%]", + "reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]", + "reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]" + } + }, + "options": { + "step": { + "init": { + "title": "Configure Telegram bot", + "data": { + "parse_mode": "Parse mode" + }, + "data_description": { + "parse_mode": "Default parse mode for messages if not explicit in message data." + } + } + } + }, + "config_subentries": { + "allowed_chat_ids": { + "initiate_flow": { + "user": "Add allowed chat ID" + }, + "step": { + "user": { + "title": "Add chat", + "data": { + "chat_id": "Chat ID" + }, + "data_description": { + "chat_id": "ID representing the user or group chat to which messages can be sent." + } + } + }, + "error": { + "chat_not_found": "Chat not found" + }, + "abort": { + "already_configured": "Chat already configured" + } + } + }, + "selector": { + "platforms": { + "options": { + "broadcast": "Broadcast", + "polling": "Polling", + "webhooks": "Webhooks" + } + }, + "parsers": { + "options": { + "markdown": "Markdown (Legacy)", + "markdownv2": "MarkdownV2", + "html": "HTML" + } + } + }, "services": { "send_message": { "name": "Send message", "description": "Sends a notification.", "fields": { + "config_entry_id": { + "name": "Config entry ID", + "description": "The config entry representing the Telegram bot to send the message." + }, "message": { "name": "Message", "description": "Message body of the notification." @@ -58,6 +177,10 @@ "name": "Send photo", "description": "Sends a photo.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the photo." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to an image." @@ -128,6 +251,10 @@ "name": "Send sticker", "description": "Sends a sticker.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the sticker." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to a static .webp or animated .tgs sticker." @@ -194,6 +321,10 @@ "name": "Send animation", "description": "Sends an animation.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the animation." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to a GIF or H.264/MPEG-4 AVC video without sound." @@ -264,6 +395,10 @@ "name": "Send video", "description": "Sends a video.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the video." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to a video." @@ -334,6 +469,10 @@ "name": "Send voice", "description": "Sends a voice message.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the voice message." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to a voice message." @@ -400,6 +539,10 @@ "name": "Send document", "description": "Sends a document.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the document." + }, "url": { "name": "[%key:common::config_flow::data::url%]", "description": "Remote path to a document." @@ -470,6 +613,10 @@ "name": "Send location", "description": "Sends a location.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the location." + }, "latitude": { "name": "[%key:common::config_flow::data::latitude%]", "description": "The latitude to send." @@ -516,6 +663,10 @@ "name": "Send poll", "description": "Sends a poll.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to send the poll." + }, "target": { "name": "Target", "description": "[%key:component::telegram_bot::services::send_location::fields::target::description%]" @@ -566,6 +717,10 @@ "name": "Edit message", "description": "Edits a previously sent message.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to edit the message." + }, "message_id": { "name": "Message ID", "description": "ID of the message to edit." @@ -600,6 +755,10 @@ "name": "Edit caption", "description": "Edits the caption of a previously sent message.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to edit the caption." + }, "message_id": { "name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]", "description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]" @@ -622,6 +781,10 @@ "name": "Edit reply markup", "description": "Edits the inline keyboard of a previously sent message.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to edit the reply markup." + }, "message_id": { "name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]", "description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]" @@ -640,6 +803,10 @@ "name": "Answer callback query", "description": "Responds to a callback query originated by clicking on an online keyboard button. The answer will be displayed to the user as a notification at the top of the chat screen or as an alert.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to answer the callback query." + }, "message": { "name": "Message", "description": "Unformatted text message body of the notification." @@ -662,6 +829,10 @@ "name": "Delete message", "description": "Deletes a previously sent message.", "fields": { + "config_entry_id": { + "name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]", + "description": "The config entry representing the Telegram bot to delete the message." + }, "message_id": { "name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]", "description": "ID of the message to delete." @@ -673,7 +844,30 @@ } } }, + "exceptions": { + "multiple_config_entry": { + "message": "Multiple config entries found. Please specify the Telegram bot to use in the Config entry ID field." + }, + "missing_config_entry": { + "message": "No config entries found or setup failed. Please set up the Telegram Bot first." + }, + "missing_allowed_chat_ids": { + "message": "No allowed chat IDs found. Please add allowed chat IDs for {bot_name}." + } + }, "issues": { + "deprecated_yaml": { + "title": "The {integration_title} YAML configuration is being removed", + "description": "Configuring {integration_title} using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the `{domain}` configuration from your configuration.yaml file and restart Home Assistant to fix this issue." + }, + "deprecated_yaml_import_issue_has_more_platforms": { + "title": "The {integration_title} YAML configuration is being removed", + "description": "Configuring {integration_title} using YAML is being removed.\n\nThe last entry of your existing YAML configuration ({telegram_bot}) has been imported into the UI automatically.\n\nRemove the `{domain}` configuration from your configuration.yaml file and restart Home Assistant to fix this issue. The other Telegram bots will need to be configured manually in the UI." + }, + "deprecated_yaml_import_issue_error": { + "title": "YAML import failed due to invalid {error_field}", + "description": "Configuring {integration_title} using YAML is being removed but there was an error while importing your existing configuration ({telegram_bot}): {error_message}.\nSetup will not proceed.\n\nVerify that your {telegram_bot} is operating correctly and restart Home Assistant to attempt the import again.\n\nAlternatively, you may remove the `{domain}` configuration from your configuration.yaml entirely, restart Home Assistant, and add the {integration_title} integration manually." + }, "proxy_params_auth_deprecation": { "title": "{telegram_bot}: Proxy authentication should be moved to the URL", "description": "Authentication details for the the proxy configured in the {telegram_bot} integration should be moved into the {proxy_url} instead. Please update your configuration and restart Home Assistant to fix this issue.\n\nThe {proxy_params} config key will be removed in a future release." diff --git a/homeassistant/components/telegram_bot/webhooks.py b/homeassistant/components/telegram_bot/webhooks.py index 9bd360f5e41..b8c2cccb738 100644 --- a/homeassistant/components/telegram_bot/webhooks.py +++ b/homeassistant/components/telegram_bot/webhooks.py @@ -2,20 +2,23 @@ import datetime as dt from http import HTTPStatus -from ipaddress import ip_address +from ipaddress import IPv4Network, ip_address import logging import secrets import string -from telegram import Update -from telegram.error import TimedOut -from telegram.ext import Application, TypeHandler +from telegram import Bot, Update +from telegram.error import NetworkError, TimedOut +from telegram.ext import ApplicationBuilder, TypeHandler from homeassistant.components.http import HomeAssistantView -from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.const import CONF_URL +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers.network import get_url -from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity +from .bot import BaseTelegramBot, TelegramBotConfigEntry +from .const import CONF_TRUSTED_NETWORKS _LOGGER = logging.getLogger(__name__) @@ -24,7 +27,9 @@ REMOVE_WEBHOOK_URL = "" SECRET_TOKEN_LENGTH = 32 -async def async_setup_platform(hass, bot, config): +async def async_setup_platform( + hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry +) -> BaseTelegramBot | None: """Set up the Telegram webhooks platform.""" # Generate an ephemeral secret token @@ -33,46 +38,56 @@ async def async_setup_platform(hass, bot, config): pushbot = PushBot(hass, bot, config, secret_token) - if not pushbot.webhook_url.startswith("https"): - _LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url) - return False - await pushbot.start_application() webhook_registered = await pushbot.register_webhook() if not webhook_registered: - return False + raise ConfigEntryNotReady("Failed to register webhook with Telegram") - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.stop_application) hass.http.register_view( PushBotView( hass, bot, pushbot.application, - config[CONF_TRUSTED_NETWORKS], + _get_trusted_networks(config), secret_token, ) ) - return True + return pushbot -class PushBot(BaseTelegramBotEntity): +def _get_trusted_networks(config: TelegramBotConfigEntry) -> list[IPv4Network]: + trusted_networks_str: list[str] = config.data[CONF_TRUSTED_NETWORKS] + return [IPv4Network(trusted_network) for trusted_network in trusted_networks_str] + + +class PushBot(BaseTelegramBot): """Handles all the push/webhook logic and passes telegram updates to `self.handle_update`.""" - def __init__(self, hass, bot, config, secret_token): + def __init__( + self, + hass: HomeAssistant, + bot: Bot, + config: TelegramBotConfigEntry, + secret_token: str, + ) -> None: """Create Application before calling super().""" self.bot = bot - self.trusted_networks = config[CONF_TRUSTED_NETWORKS] + self.trusted_networks = _get_trusted_networks(config) self.secret_token = secret_token # Dumb Application that just gets our updates to our handler callback (self.handle_update) - self.application = Application.builder().bot(bot).updater(None).build() + self.application = ApplicationBuilder().bot(bot).updater(None).build() self.application.add_handler(TypeHandler(Update, self.handle_update)) super().__init__(hass, config) - self.base_url = config.get(CONF_URL) or get_url( + self.base_url = config.data.get(CONF_URL) or get_url( hass, require_ssl=True, allow_internal=False ) self.webhook_url = f"{self.base_url}{TELEGRAM_WEBHOOK_URL}" + async def shutdown(self) -> None: + """Shutdown the app.""" + await self.stop_application() + async def _try_to_set_webhook(self): _LOGGER.debug("Registering webhook URL: %s", self.webhook_url) retry_num = 0 @@ -127,7 +142,10 @@ class PushBot(BaseTelegramBotEntity): async def deregister_webhook(self): """Query telegram and deregister the URL for our webhook.""" _LOGGER.debug("Deregistering webhook URL") - await self.bot.delete_webhook() + try: + await self.bot.delete_webhook() + except NetworkError: + _LOGGER.error("Failed to deregister webhook URL") class PushBotView(HomeAssistantView): @@ -137,7 +155,14 @@ class PushBotView(HomeAssistantView): url = TELEGRAM_WEBHOOK_URL name = "telegram_webhooks" - def __init__(self, hass, bot, application, trusted_networks, secret_token): + def __init__( + self, + hass: HomeAssistant, + bot: Bot, + application, + trusted_networks: list[IPv4Network], + secret_token: str, + ) -> None: """Initialize by storing stuff needed for setting up our webhook endpoint.""" self.hass = hass self.bot = bot diff --git a/homeassistant/generated/config_flows.py b/homeassistant/generated/config_flows.py index 44a9b19e8c2..86f45c44fdc 100644 --- a/homeassistant/generated/config_flows.py +++ b/homeassistant/generated/config_flows.py @@ -634,6 +634,7 @@ FLOWS = { "tautulli", "technove", "tedee", + "telegram_bot", "tellduslive", "tesla_fleet", "tesla_wall_connector", diff --git a/homeassistant/generated/integrations.json b/homeassistant/generated/integrations.json index 775272f77c4..dc46ddc6e16 100644 --- a/homeassistant/generated/integrations.json +++ b/homeassistant/generated/integrations.json @@ -6578,7 +6578,7 @@ }, "telegram_bot": { "integration_type": "hub", - "config_flow": false, + "config_flow": true, "iot_class": "cloud_push", "name": "Telegram bot" } diff --git a/tests/components/telegram_bot/conftest.py b/tests/components/telegram_bot/conftest.py index f15db7eba2b..2b364af497e 100644 --- a/tests/components/telegram_bot/conftest.py +++ b/tests/components/telegram_bot/conftest.py @@ -3,26 +3,31 @@ from collections.abc import AsyncGenerator, Generator from datetime import datetime from typing import Any -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest -from telegram import Bot, Chat, Message, User -from telegram.constants import ChatType +from telegram import Bot, Chat, ChatFullInfo, Message, User +from telegram.constants import AccentColor, ChatType from homeassistant.components.telegram_bot import ( + ATTR_PARSER, CONF_ALLOWED_CHAT_IDS, CONF_TRUSTED_NETWORKS, DOMAIN, + PARSER_MD, ) -from homeassistant.const import ( - CONF_API_KEY, - CONF_PLATFORM, - CONF_URL, - EVENT_HOMEASSISTANT_START, +from homeassistant.components.telegram_bot.const import ( + CONF_CHAT_ID, + PLATFORM_BROADCAST, + PLATFORM_WEBHOOKS, ) +from homeassistant.config_entries import ConfigSubentryData +from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component +from tests.common import MockConfigEntry + @pytest.fixture def config_webhooks() -> dict[str, Any]: @@ -30,7 +35,7 @@ def config_webhooks() -> dict[str, Any]: return { DOMAIN: [ { - CONF_PLATFORM: "webhooks", + CONF_PLATFORM: PLATFORM_WEBHOOKS, CONF_URL: "https://test", CONF_TRUSTED_NETWORKS: ["127.0.0.1"], CONF_API_KEY: "1234567890:ABC", @@ -83,6 +88,14 @@ def mock_register_webhook() -> Generator[None]: @pytest.fixture def mock_external_calls() -> Generator[None]: """Mock calls that make calls to the live Telegram API.""" + test_chat = ChatFullInfo( + id=123456, + title="mock title", + first_name="mock first_name", + type="PRIVATE", + max_reaction_count=100, + accent_color_id=AccentColor.COLOR_000, + ) test_user = User(123456, "Testbot", True) message = Message( message_id=12345, @@ -100,8 +113,12 @@ def mock_external_calls() -> Generator[None]: super().__init__(*args, **kwargs) self._bot_user = test_user + async def delete_webhook(self) -> bool: + return True + with ( - patch("homeassistant.components.telegram_bot.Bot", BotMock), + patch("homeassistant.components.telegram_bot.bot.Bot", BotMock), + patch.object(BotMock, "get_chat", return_value=test_chat), patch.object(BotMock, "get_me", return_value=test_user), patch.object(BotMock, "bot", test_user), patch.object(BotMock, "send_message", return_value=message), @@ -225,6 +242,54 @@ def update_callback_query(): } +@pytest.fixture +def mock_broadcast_config_entry() -> MockConfigEntry: + """Return the default mocked config entry.""" + return MockConfigEntry( + unique_id="mock api key", + domain=DOMAIN, + data={ + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_API_KEY: "mock api key", + }, + options={ATTR_PARSER: PARSER_MD}, + subentries_data=[ + ConfigSubentryData( + unique_id="1234567890", + data={CONF_CHAT_ID: 1234567890}, + subentry_id="mock_id", + subentry_type=CONF_ALLOWED_CHAT_IDS, + title="mock chat", + ) + ], + ) + + +@pytest.fixture +def mock_webhooks_config_entry() -> MockConfigEntry: + """Return the default mocked config entry.""" + return MockConfigEntry( + unique_id="mock api key", + domain=DOMAIN, + data={ + CONF_PLATFORM: PLATFORM_WEBHOOKS, + CONF_API_KEY: "mock api key", + CONF_URL: "https://test", + CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22", + }, + options={ATTR_PARSER: PARSER_MD}, + subentries_data=[ + ConfigSubentryData( + unique_id="1234567890", + data={CONF_CHAT_ID: 1234567890}, + subentry_id="mock_id", + subentry_type=CONF_ALLOWED_CHAT_IDS, + title="mock chat", + ) + ], + ) + + @pytest.fixture async def webhook_platform( hass: HomeAssistant, @@ -249,11 +314,23 @@ async def polling_platform( hass: HomeAssistant, config_polling: dict[str, Any], mock_external_calls: None ) -> None: """Fixture for setting up the polling platform using appropriate config and mocks.""" - await async_setup_component( - hass, - DOMAIN, - config_polling, - ) - # Fire this event to start polling - hass.bus.fire(EVENT_HOMEASSISTANT_START) - await hass.async_block_till_done() + with patch( + "homeassistant.components.telegram_bot.polling.ApplicationBuilder" + ) as application_builder_class: + application = ( + application_builder_class.return_value.bot.return_value.build.return_value + ) + application.initialize = AsyncMock() + application.updater.start_polling = AsyncMock() + application.start = AsyncMock() + application.updater.stop = AsyncMock() + application.stop = AsyncMock() + application.shutdown = AsyncMock() + + await async_setup_component( + hass, + DOMAIN, + config_polling, + ) + + await hass.async_block_till_done() diff --git a/tests/components/telegram_bot/test_broadcast.py b/tests/components/telegram_bot/test_broadcast.py index b78054dc087..c82d3889ec5 100644 --- a/tests/components/telegram_bot/test_broadcast.py +++ b/tests/components/telegram_bot/test_broadcast.py @@ -4,7 +4,7 @@ from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component -async def test_setup(hass: HomeAssistant) -> None: +async def test_setup(hass: HomeAssistant, mock_external_calls: None) -> None: """Test setting up Telegram broadcast.""" assert await async_setup_component( hass, diff --git a/tests/components/telegram_bot/test_config_flow.py b/tests/components/telegram_bot/test_config_flow.py new file mode 100644 index 00000000000..09c8d99472a --- /dev/null +++ b/tests/components/telegram_bot/test_config_flow.py @@ -0,0 +1,559 @@ +"""Config flow tests for the Telegram Bot integration.""" + +from unittest.mock import patch + +from telegram import ChatFullInfo, User +from telegram.constants import AccentColor +from telegram.error import BadRequest, InvalidToken, NetworkError + +from homeassistant.components.telegram_bot.const import ( + ATTR_PARSER, + BOT_NAME, + CONF_ALLOWED_CHAT_IDS, + CONF_BOT_COUNT, + CONF_CHAT_ID, + CONF_PROXY_URL, + CONF_TRUSTED_NETWORKS, + DOMAIN, + ERROR_FIELD, + ERROR_MESSAGE, + ISSUE_DEPRECATED_YAML, + ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR, + PARSER_HTML, + PARSER_MD, + PLATFORM_BROADCAST, + PLATFORM_WEBHOOKS, + SUBENTRY_TYPE_ALLOWED_CHAT_IDS, +) +from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER, ConfigSubentry +from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL +from homeassistant.core import HomeAssistant +from homeassistant.data_entry_flow import FlowResultType +from homeassistant.helpers.issue_registry import IssueRegistry + +from tests.common import MockConfigEntry + + +async def test_options_flow( + hass: HomeAssistant, mock_webhooks_config_entry: MockConfigEntry +) -> None: + """Test options flow.""" + + mock_webhooks_config_entry.add_to_hass(hass) + + # test: no input + + result = await hass.config_entries.options.async_init( + mock_webhooks_config_entry.entry_id + ) + await hass.async_block_till_done() + + assert result["step_id"] == "init" + assert result["type"] == FlowResultType.FORM + + # test: valid input + + result = await hass.config_entries.options.async_configure( + result["flow_id"], + { + ATTR_PARSER: PARSER_HTML, + }, + ) + await hass.async_block_till_done() + + assert result["type"] == FlowResultType.CREATE_ENTRY + assert result["data"][ATTR_PARSER] == PARSER_HTML + + +async def test_reconfigure_flow_broadcast( + hass: HomeAssistant, + mock_webhooks_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test reconfigure flow for broadcast bot.""" + mock_webhooks_config_entry.add_to_hass(hass) + + result = await mock_webhooks_config_entry.start_reconfigure_flow(hass) + assert result["step_id"] == "reconfigure" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + # test: invalid proxy url + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + ) as mock_bot: + mock_bot.side_effect = NetworkError("mock invalid proxy") + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_PROXY_URL: "invalid", + }, + ) + await hass.async_block_till_done() + + assert result["step_id"] == "reconfigure" + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "invalid_proxy_url" + + # test: valid + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_PROXY_URL: "https://test", + }, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reconfigure_successful" + assert mock_webhooks_config_entry.data[CONF_PLATFORM] == PLATFORM_BROADCAST + + +async def test_reconfigure_flow_webhooks( + hass: HomeAssistant, + mock_webhooks_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test reconfigure flow for webhook.""" + mock_webhooks_config_entry.add_to_hass(hass) + + result = await mock_webhooks_config_entry.start_reconfigure_flow(hass) + assert result["step_id"] == "reconfigure" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_PLATFORM: PLATFORM_WEBHOOKS, + CONF_PROXY_URL: "https://test", + }, + ) + await hass.async_block_till_done() + + assert result["step_id"] == "webhooks" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + # test: invalid url + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_URL: "http://test", + CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22", + }, + ) + + assert result["step_id"] == "webhooks" + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "invalid_url" + + # test: HA external url not configured + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22"}, + ) + + assert result["step_id"] == "webhooks" + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "no_url_available" + + # test: invalid trusted networks + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_URL: "https://reconfigure", + CONF_TRUSTED_NETWORKS: "invalid trusted networks", + }, + ) + + assert result["step_id"] == "webhooks" + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "invalid_trusted_networks" + + # test: valid input + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_URL: "https://reconfigure", + CONF_TRUSTED_NETWORKS: "149.154.160.0/20", + }, + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reconfigure_successful" + assert mock_webhooks_config_entry.data[CONF_URL] == "https://reconfigure" + assert mock_webhooks_config_entry.data[CONF_TRUSTED_NETWORKS] == [ + "149.154.160.0/20" + ] + + +async def test_create_entry( + hass: HomeAssistant, +) -> None: + """Test user flow.""" + + # test: no input + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + ) + + assert result["step_id"] == "user" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + # test: invalid proxy url + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + ) as mock_bot: + mock_bot.side_effect = NetworkError("mock invalid proxy") + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_PLATFORM: PLATFORM_WEBHOOKS, + CONF_API_KEY: "mock api key", + CONF_PROXY_URL: "invalid", + }, + ) + await hass.async_block_till_done() + + assert result["step_id"] == "user" + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "invalid_proxy_url" + + # test: valid input, to continue with webhooks step + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_PLATFORM: PLATFORM_WEBHOOKS, + CONF_API_KEY: "mock api key", + CONF_PROXY_URL: "https://proxy", + }, + ) + await hass.async_block_till_done() + + assert result["step_id"] == "webhooks" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + # test: valid input for webhooks + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + { + CONF_URL: "https://test", + CONF_TRUSTED_NETWORKS: "149.154.160.0/20", + }, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == "Testbot" + assert result["data"][CONF_PLATFORM] == PLATFORM_WEBHOOKS + assert result["data"][CONF_API_KEY] == "mock api key" + assert result["data"][CONF_PROXY_URL] == "https://proxy" + assert result["data"][CONF_URL] == "https://test" + assert result["data"][CONF_TRUSTED_NETWORKS] == ["149.154.160.0/20"] + + +async def test_reauth_flow( + hass: HomeAssistant, mock_webhooks_config_entry: MockConfigEntry +) -> None: + """Test a reauthentication flow.""" + mock_webhooks_config_entry.add_to_hass(hass) + + result = await mock_webhooks_config_entry.start_reauth_flow( + hass, data={CONF_API_KEY: "dummy"} + ) + assert result["step_id"] == "reauth_confirm" + assert result["type"] is FlowResultType.FORM + assert result["errors"] is None + + # test: reauth invalid api key + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me" + ) as mock_bot: + mock_bot.side_effect = InvalidToken("mock invalid token error") + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {CONF_API_KEY: "new mock api key"}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.FORM + assert result["errors"]["base"] == "invalid_api_key" + + # test: valid + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + result = await hass.config_entries.flow.async_configure( + result["flow_id"], + {CONF_API_KEY: "new mock api key"}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reauth_successful" + assert mock_webhooks_config_entry.data[CONF_API_KEY] == "new mock api key" + + +async def test_subentry_flow( + hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry +) -> None: + """Test subentry flow.""" + mock_broadcast_config_entry.add_to_hass(hass) + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + assert await hass.config_entries.async_setup( + mock_broadcast_config_entry.entry_id + ) + await hass.async_block_till_done() + + result = await hass.config_entries.subentries.async_init( + (mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_chat", + return_value=ChatFullInfo( + id=987654321, + title="mock title", + first_name="mock first_name", + type="PRIVATE", + max_reaction_count=100, + accent_color_id=AccentColor.COLOR_000, + ), + ): + result = await hass.config_entries.subentries.async_configure( + result["flow_id"], + user_input={CONF_CHAT_ID: 987654321}, + ) + await hass.async_block_till_done() + + subentry_id = list(mock_broadcast_config_entry.subentries)[-1] + subentry: ConfigSubentry = mock_broadcast_config_entry.subentries[subentry_id] + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert subentry.subentry_type == SUBENTRY_TYPE_ALLOWED_CHAT_IDS + assert subentry.title == "mock title" + assert subentry.unique_id == "987654321" + assert subentry.data == {CONF_CHAT_ID: 987654321} + + +async def test_subentry_flow_chat_error( + hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry +) -> None: + """Test subentry flow.""" + mock_broadcast_config_entry.add_to_hass(hass) + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + assert await hass.config_entries.async_setup( + mock_broadcast_config_entry.entry_id + ) + await hass.async_block_till_done() + + result = await hass.config_entries.subentries.async_init( + (mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS), + context={"source": SOURCE_USER}, + ) + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + + # test: chat not found + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_chat" + ) as mock_bot: + mock_bot.side_effect = BadRequest("mock chat not found") + + result = await hass.config_entries.subentries.async_configure( + result["flow_id"], + user_input={CONF_CHAT_ID: 1234567890}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["errors"]["base"] == "chat_not_found" + + # test: chat id already configured + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_chat", + return_value=ChatFullInfo( + id=1234567890, + title="mock title", + first_name="mock first_name", + type="PRIVATE", + max_reaction_count=100, + accent_color_id=AccentColor.COLOR_000, + ), + ): + result = await hass.config_entries.subentries.async_configure( + result["flow_id"], + user_input={CONF_CHAT_ID: 1234567890}, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "already_configured" + + +async def test_import_failed( + hass: HomeAssistant, issue_registry: IssueRegistry +) -> None: + """Test import flow failed.""" + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me" + ) as mock_bot: + mock_bot.side_effect = InvalidToken("mock invalid token error") + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_IMPORT}, + data={ + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_API_KEY: "mock api key", + CONF_TRUSTED_NETWORKS: ["149.154.160.0/20"], + CONF_BOT_COUNT: 1, + }, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "import_failed" + + issue = issue_registry.async_get_issue( + domain=DOMAIN, + issue_id=ISSUE_DEPRECATED_YAML, + ) + assert issue.translation_key == ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR + assert ( + issue.translation_placeholders[BOT_NAME] == f"{PLATFORM_BROADCAST} Telegram bot" + ) + assert issue.translation_placeholders[ERROR_FIELD] == "API key" + assert issue.translation_placeholders[ERROR_MESSAGE] == "mock invalid token error" + + +async def test_import_multiple( + hass: HomeAssistant, issue_registry: IssueRegistry +) -> None: + """Test import flow with multiple duplicated entries.""" + + data = { + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_API_KEY: "mock api key", + CONF_TRUSTED_NETWORKS: ["149.154.160.0/20"], + CONF_ALLOWED_CHAT_IDS: [3334445550], + CONF_BOT_COUNT: 2, + } + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + # test: import first entry success + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_IMPORT}, + data=data, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"][CONF_PLATFORM] == PLATFORM_BROADCAST + assert result["data"][CONF_API_KEY] == "mock api key" + assert result["options"][ATTR_PARSER] == PARSER_MD + + issue = issue_registry.async_get_issue( + domain=DOMAIN, + issue_id=ISSUE_DEPRECATED_YAML, + ) + assert ( + issue.translation_key == "deprecated_yaml_import_issue_has_more_platforms" + ) + + # test: import 2nd entry failed due to duplicate + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_IMPORT}, + data=data, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "already_configured" + + +async def test_duplicate_entry(hass: HomeAssistant) -> None: + """Test user flow with duplicated entries.""" + + data = { + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_API_KEY: "mock api key", + } + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + # test: import first entry success + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + data=data, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["data"][CONF_PLATFORM] == PLATFORM_BROADCAST + assert result["data"][CONF_API_KEY] == "mock api key" + assert result["options"][ATTR_PARSER] == PARSER_MD + + # test: import 2nd entry failed due to duplicate + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + data=data, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "already_configured" diff --git a/tests/components/telegram_bot/test_telegram_bot.py b/tests/components/telegram_bot/test_telegram_bot.py index c9038003cfc..928c9579020 100644 --- a/tests/components/telegram_bot/test_telegram_bot.py +++ b/tests/components/telegram_bot/test_telegram_bot.py @@ -6,19 +6,35 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock, mock_open, patch import pytest -from telegram import Update -from telegram.error import NetworkError, RetryAfter, TelegramError, TimedOut +from telegram import Update, User +from telegram.error import ( + InvalidToken, + NetworkError, + RetryAfter, + TelegramError, + TimedOut, +) from homeassistant.components.telegram_bot import ( + ATTR_CALLBACK_QUERY_ID, + ATTR_CHAT_ID, ATTR_FILE, ATTR_LATITUDE, ATTR_LONGITUDE, ATTR_MESSAGE, ATTR_MESSAGE_THREAD_ID, + ATTR_MESSAGEID, ATTR_OPTIONS, ATTR_QUESTION, ATTR_STICKER_ID, + ATTR_TARGET, + CONF_CONFIG_ENTRY_ID, + CONF_PLATFORM, DOMAIN, + PLATFORM_BROADCAST, + SERVICE_ANSWER_CALLBACK_QUERY, + SERVICE_DELETE_MESSAGE, + SERVICE_EDIT_MESSAGE, SERVICE_SEND_ANIMATION, SERVICE_SEND_DOCUMENT, SERVICE_SEND_LOCATION, @@ -28,13 +44,17 @@ from homeassistant.components.telegram_bot import ( SERVICE_SEND_STICKER, SERVICE_SEND_VIDEO, SERVICE_SEND_VOICE, + async_setup_entry, ) from homeassistant.components.telegram_bot.webhooks import TELEGRAM_WEBHOOK_URL -from homeassistant.const import EVENT_HOMEASSISTANT_START +from homeassistant.config_entries import SOURCE_USER +from homeassistant.const import CONF_API_KEY from homeassistant.core import Context, HomeAssistant +from homeassistant.data_entry_flow import FlowResultType +from homeassistant.exceptions import ConfigEntryAuthFailed, ServiceValidationError from homeassistant.setup import async_setup_component -from tests.common import async_capture_events +from tests.common import MockConfigEntry, async_capture_events from tests.typing import ClientSessionGenerator @@ -145,7 +165,7 @@ async def test_send_file(hass: HomeAssistant, webhook_platform, service: str) -> # Mock the file handler read with our base64 encoded dummy file with patch( - "homeassistant.components.telegram_bot._read_file_as_bytesio", + "homeassistant.components.telegram_bot.bot._read_file_as_bytesio", _read_file_as_bytesio_mock, ): response = await hass.services.async_call( @@ -269,24 +289,35 @@ async def test_webhook_endpoint_generates_telegram_callback_event( async def test_polling_platform_message_text_update( - hass: HomeAssistant, config_polling, update_message_text + hass: HomeAssistant, + config_polling, + update_message_text, + mock_external_calls: None, ) -> None: - """Provide the `BaseTelegramBotEntity.update_handler` with an `Update` and assert fired `telegram_text` event.""" + """Provide the `BaseTelegramBot.update_handler` with an `Update` and assert fired `telegram_text` event.""" events = async_capture_events(hass, "telegram_text") with patch( "homeassistant.components.telegram_bot.polling.ApplicationBuilder" ) as application_builder_class: + # Set up the integration with the polling platform inside the patch context manager. + application = ( + application_builder_class.return_value.bot.return_value.build.return_value + ) + application.updater.start_polling = AsyncMock() + application.updater.stop = AsyncMock() + application.initialize = AsyncMock() + application.start = AsyncMock() + application.stop = AsyncMock() + application.shutdown = AsyncMock() + await async_setup_component( hass, DOMAIN, config_polling, ) await hass.async_block_till_done() - # Set up the integration with the polling platform inside the patch context manager. - application = ( - application_builder_class.return_value.bot.return_value.build.return_value - ) + # Then call the callback and assert events fired. handler = application.add_handler.call_args[0][0] handle_update_callback = handler.callback @@ -295,13 +326,9 @@ async def test_polling_platform_message_text_update( application.bot.defaults.tzinfo = None update = Update.de_json(update_message_text, application.bot) - # handle_update_callback == BaseTelegramBotEntity.update_handler + # handle_update_callback == BaseTelegramBot.update_handler await handle_update_callback(update, None) - application.updater.stop = AsyncMock() - application.stop = AsyncMock() - application.shutdown = AsyncMock() - # Make sure event has fired await hass.async_block_till_done() @@ -326,6 +353,7 @@ async def test_polling_platform_add_error_handler( hass: HomeAssistant, config_polling: dict[str, Any], update_message_text: dict[str, Any], + mock_external_calls: None, caplog: pytest.LogCaptureFixture, error: Exception, log_message: str, @@ -334,6 +362,17 @@ async def test_polling_platform_add_error_handler( with patch( "homeassistant.components.telegram_bot.polling.ApplicationBuilder" ) as application_builder_class: + application = ( + application_builder_class.return_value.bot.return_value.build.return_value + ) + application.updater.stop = AsyncMock() + application.initialize = AsyncMock() + application.updater.start_polling = AsyncMock() + application.start = AsyncMock() + application.stop = AsyncMock() + application.shutdown = AsyncMock() + application.bot.defaults.tzinfo = None + await async_setup_component( hass, DOMAIN, @@ -341,16 +380,8 @@ async def test_polling_platform_add_error_handler( ) await hass.async_block_till_done() - application = ( - application_builder_class.return_value.bot.return_value.build.return_value - ) - application.updater.stop = AsyncMock() - application.stop = AsyncMock() - application.shutdown = AsyncMock() - process_error = application.add_error_handler.call_args[0][0] - application.bot.defaults.tzinfo = None update = Update.de_json(update_message_text, application.bot) - + process_error = application.add_error_handler.call_args[0][0] await process_error(update, MagicMock(error=error)) assert log_message in caplog.text @@ -372,6 +403,7 @@ async def test_polling_platform_start_polling_error_callback( hass: HomeAssistant, config_polling: dict[str, Any], caplog: pytest.LogCaptureFixture, + mock_external_calls: None, error: Exception, log_message: str, ) -> None: @@ -379,13 +411,6 @@ async def test_polling_platform_start_polling_error_callback( with patch( "homeassistant.components.telegram_bot.polling.ApplicationBuilder" ) as application_builder_class: - await async_setup_component( - hass, - DOMAIN, - config_polling, - ) - await hass.async_block_till_done() - application = ( application_builder_class.return_value.bot.return_value.build.return_value ) @@ -396,7 +421,12 @@ async def test_polling_platform_start_polling_error_callback( application.stop = AsyncMock() application.shutdown = AsyncMock() - hass.bus.async_fire(EVENT_HOMEASSISTANT_START) + await async_setup_component( + hass, + DOMAIN, + config_polling, + ) + await hass.async_block_till_done() error_callback = application.updater.start_polling.call_args.kwargs[ "error_callback" @@ -466,3 +496,220 @@ async def test_webhook_endpoint_invalid_secret_token_is_denied( headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token}, ) assert response.status == 401 + + +async def test_multiple_config_entries_error( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + polling_platform, + mock_external_calls: None, +) -> None: + """Test multiple config entries error.""" + + # setup the second entry (polling_platform is first entry) + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with pytest.raises(ServiceValidationError) as err: + await hass.services.async_call( + DOMAIN, + SERVICE_SEND_MESSAGE, + { + ATTR_MESSAGE: "mock message", + }, + blocking=True, + return_response=True, + ) + + await hass.async_block_till_done() + assert err.value.translation_key == "multiple_config_entry" + + +async def test_send_message_with_config_entry( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test send message using config entry.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + response = await hass.services.async_call( + DOMAIN, + SERVICE_SEND_MESSAGE, + { + CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id, + ATTR_MESSAGE: "mock message", + ATTR_TARGET: 1, + }, + blocking=True, + return_response=True, + ) + + assert response["chats"][0]["message_id"] == 12345 + + +async def test_send_message_no_chat_id_error( + hass: HomeAssistant, + mock_external_calls: None, +) -> None: + """Test send message using config entry with no whitelisted chat id.""" + data = { + CONF_PLATFORM: PLATFORM_BROADCAST, + CONF_API_KEY: "mock api key", + } + + with patch( + "homeassistant.components.telegram_bot.config_flow.Bot.get_me", + return_value=User(123456, "Testbot", True), + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": SOURCE_USER}, + data=data, + ) + await hass.async_block_till_done() + + assert result["type"] is FlowResultType.CREATE_ENTRY + + with pytest.raises(ServiceValidationError) as err: + await hass.services.async_call( + DOMAIN, + SERVICE_SEND_MESSAGE, + { + CONF_CONFIG_ENTRY_ID: result["result"].entry_id, + ATTR_MESSAGE: "mock message", + }, + blocking=True, + return_response=True, + ) + + assert err.value.translation_key == "missing_allowed_chat_ids" + assert err.value.translation_placeholders["bot_name"] == "Testbot" + + +async def test_send_message_config_entry_error( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test send message config entry error.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + await hass.config_entries.async_unload(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with pytest.raises(ServiceValidationError) as err: + await hass.services.async_call( + DOMAIN, + SERVICE_SEND_MESSAGE, + { + CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id, + ATTR_MESSAGE: "mock message", + }, + blocking=True, + return_response=True, + ) + + await hass.async_block_till_done() + assert err.value.translation_key == "missing_config_entry" + + +async def test_delete_message( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test delete message.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.telegram_bot.bot.TelegramNotificationService.delete_message", + AsyncMock(return_value=True), + ) as mock: + await hass.services.async_call( + DOMAIN, + SERVICE_DELETE_MESSAGE, + {ATTR_CHAT_ID: 12345, ATTR_MESSAGEID: 12345}, + blocking=True, + ) + + await hass.async_block_till_done() + mock.assert_called_once() + + +async def test_edit_message( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test edit message.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.telegram_bot.bot.TelegramNotificationService.edit_message", + AsyncMock(return_value=True), + ) as mock: + await hass.services.async_call( + DOMAIN, + SERVICE_EDIT_MESSAGE, + {ATTR_MESSAGE: "mock message", ATTR_CHAT_ID: 12345, ATTR_MESSAGEID: 12345}, + blocking=True, + ) + + await hass.async_block_till_done() + mock.assert_called_once() + + +async def test_async_setup_entry_failed( + hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry +) -> None: + """Test setup entry failed.""" + mock_broadcast_config_entry.add_to_hass(hass) + + with patch( + "homeassistant.components.telegram_bot.Bot.get_me", + ) as mock_bot: + mock_bot.side_effect = InvalidToken("mock invalid token error") + + with pytest.raises(ConfigEntryAuthFailed) as err: + await async_setup_entry(hass, mock_broadcast_config_entry) + + await hass.async_block_till_done() + assert err.value.args[0] == "Invalid API token for Telegram Bot." + + +async def test_answer_callback_query( + hass: HomeAssistant, + mock_broadcast_config_entry: MockConfigEntry, + mock_external_calls: None, +) -> None: + """Test answer callback query.""" + mock_broadcast_config_entry.add_to_hass(hass) + await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id) + await hass.async_block_till_done() + + with patch( + "homeassistant.components.telegram_bot.bot.TelegramNotificationService.answer_callback_query", + AsyncMock(), + ) as mock: + await hass.services.async_call( + DOMAIN, + SERVICE_ANSWER_CALLBACK_QUERY, + { + ATTR_MESSAGE: "mock message", + ATTR_CALLBACK_QUERY_ID: 12345, + }, + blocking=True, + ) + + await hass.async_block_till_done() + mock.assert_called_once()