Add config flow for telegram bot integration (#144617)

* added config flow for telegram integration

* added chat id in config entry title and added config flow tests

* fix import issue when there are no notifiers in configuration.yaml

* Revert "fix import issue when there are no notifiers in configuration.yaml"

This reverts commit b5b83e2a9a5d8cd1572f3e8c36e360b0de80b58b.

* Revert "added chat id in config entry title and added config flow tests"

This reverts commit 30c2bb4ae4d850dae931a5f7e1525cf19e3be5d8.

* Revert "added config flow for telegram integration"

This reverts commit 1f44afcd45e3a017b8c5f681dc39a160617018ce.

* added config and subentry flows

* added options flow to configure webhooks

* refactor module setup so it only load once

* moved service registration from async_setup_entry to async_setup

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* import only last yaml config

* import only last yaml config

* reduced scope of try-block

* create issue when importing from yaml

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* handle options update by reloading telegram bot

* handle import errors for create issue

* include bot's platform when creating issues

* handle options reload without needing HA restart

* moved url and trusted_networks inputs from options to new config flow step

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* minor fixes

* refactor config flow

* moved constants to const.py

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/config_flow.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/config_flow.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/config_flow.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* added options flow tests

* Update homeassistant/components/telegram_bot/__init__.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/__init__.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/__init__.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/config_flow.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Update homeassistant/components/telegram_bot/config_flow.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* added reconfigure flow

* added reauth flow

* added tests for reconfigure flow

* added tests for reauth

* added tests for subentry flow

* added tests for user and webhooks flow with error scenarios

* added import flow tests

* handle webhook deregister exception

* added config entry id to all services

* fix leave chat bug

* Update homeassistant/components/telegram_bot/__init__.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* removed leave chat bug fixes

* Update homeassistant/components/telegram_bot/strings.json

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* handle other error types for import

* reuse translations

* added test for duplicated config entry for user step

* added tests

---------

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
hanwg 2025-06-02 14:52:31 +08:00 committed by GitHub
parent de4a5fa30b
commit 85a86c3f11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 3097 additions and 1009 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,924 @@
"""Telegram bot classes and utilities."""
from abc import abstractmethod
import asyncio
import io
import logging
from types import MappingProxyType
from typing import Any
import httpx
from telegram import (
Bot,
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
Update,
User,
)
from telegram.constants import ParseMode
from telegram.error import TelegramError
from telegram.ext import CallbackContext, filters
from telegram.request import HTTPXRequest
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_COMMAND,
CONF_API_KEY,
HTTP_BEARER_AUTHENTICATION,
HTTP_DIGEST_AUTHENTICATION,
)
from homeassistant.core import Context, HomeAssistant
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers import issue_registry as ir
from homeassistant.util.ssl import get_default_context, get_default_no_verify_context
from .const import (
ATTR_ARGS,
ATTR_AUTHENTICATION,
ATTR_CAPTION,
ATTR_CHAT_ID,
ATTR_CHAT_INSTANCE,
ATTR_DATA,
ATTR_DATE,
ATTR_DISABLE_NOTIF,
ATTR_DISABLE_WEB_PREV,
ATTR_FILE,
ATTR_FROM_FIRST,
ATTR_FROM_LAST,
ATTR_KEYBOARD,
ATTR_KEYBOARD_INLINE,
ATTR_MESSAGE,
ATTR_MESSAGE_TAG,
ATTR_MESSAGE_THREAD_ID,
ATTR_MESSAGEID,
ATTR_MSG,
ATTR_MSGID,
ATTR_ONE_TIME_KEYBOARD,
ATTR_OPEN_PERIOD,
ATTR_PARSER,
ATTR_PASSWORD,
ATTR_REPLY_TO_MSGID,
ATTR_REPLYMARKUP,
ATTR_RESIZE_KEYBOARD,
ATTR_STICKER_ID,
ATTR_TEXT,
ATTR_TIMEOUT,
ATTR_TITLE,
ATTR_URL,
ATTR_USER_ID,
ATTR_USERNAME,
ATTR_VERIFY_SSL,
CONF_CHAT_ID,
CONF_PROXY_PARAMS,
CONF_PROXY_URL,
DOMAIN,
EVENT_TELEGRAM_CALLBACK,
EVENT_TELEGRAM_COMMAND,
EVENT_TELEGRAM_SENT,
EVENT_TELEGRAM_TEXT,
PARSER_HTML,
PARSER_MD,
PARSER_MD2,
PARSER_PLAIN_TEXT,
SERVICE_EDIT_CAPTION,
SERVICE_EDIT_MESSAGE,
SERVICE_SEND_ANIMATION,
SERVICE_SEND_DOCUMENT,
SERVICE_SEND_PHOTO,
SERVICE_SEND_STICKER,
SERVICE_SEND_VIDEO,
SERVICE_SEND_VOICE,
)
_LOGGER = logging.getLogger(__name__)
type TelegramBotConfigEntry = ConfigEntry[TelegramNotificationService]
class BaseTelegramBot:
"""The base class for the telegram bot."""
def __init__(self, hass: HomeAssistant, config: TelegramBotConfigEntry) -> None:
"""Initialize the bot base class."""
self.hass = hass
self.config = config
@abstractmethod
async def shutdown(self) -> None:
"""Shutdown the bot application."""
async def handle_update(self, update: Update, context: CallbackContext) -> bool:
"""Handle updates from bot application set up by the respective platform."""
_LOGGER.debug("Handling update %s", update)
if not self.authorize_update(update):
return False
# establish event type: text, command or callback_query
if update.callback_query:
# NOTE: Check for callback query first since effective message will be populated with the message
# in .callback_query (python-telegram-bot docs are wrong)
event_type, event_data = self._get_callback_query_event_data(
update.callback_query
)
elif update.effective_message:
event_type, event_data = self._get_message_event_data(
update.effective_message
)
else:
_LOGGER.warning("Unhandled update: %s", update)
return True
event_context = Context()
_LOGGER.debug("Firing event %s: %s", event_type, event_data)
self.hass.bus.async_fire(event_type, event_data, context=event_context)
return True
@staticmethod
def _get_command_event_data(command_text: str | None) -> dict[str, str | list]:
if not command_text or not command_text.startswith("/"):
return {}
command_parts = command_text.split()
command = command_parts[0]
args = command_parts[1:]
return {ATTR_COMMAND: command, ATTR_ARGS: args}
def _get_message_event_data(self, message: Message) -> tuple[str, dict[str, Any]]:
event_data: dict[str, Any] = {
ATTR_MSGID: message.message_id,
ATTR_CHAT_ID: message.chat.id,
ATTR_DATE: message.date,
ATTR_MESSAGE_THREAD_ID: message.message_thread_id,
}
if filters.COMMAND.filter(message):
# This is a command message - set event type to command and split data into command and args
event_type = EVENT_TELEGRAM_COMMAND
event_data.update(self._get_command_event_data(message.text))
else:
event_type = EVENT_TELEGRAM_TEXT
event_data[ATTR_TEXT] = message.text
if message.from_user:
event_data.update(self._get_user_event_data(message.from_user))
return event_type, event_data
def _get_user_event_data(self, user: User) -> dict[str, Any]:
return {
ATTR_USER_ID: user.id,
ATTR_FROM_FIRST: user.first_name,
ATTR_FROM_LAST: user.last_name,
}
def _get_callback_query_event_data(
self, callback_query: CallbackQuery
) -> tuple[str, dict[str, Any]]:
event_type = EVENT_TELEGRAM_CALLBACK
event_data: dict[str, Any] = {
ATTR_MSGID: callback_query.id,
ATTR_CHAT_INSTANCE: callback_query.chat_instance,
ATTR_DATA: callback_query.data,
ATTR_MSG: None,
ATTR_CHAT_ID: None,
}
if callback_query.message:
event_data[ATTR_MSG] = callback_query.message.to_dict()
event_data[ATTR_CHAT_ID] = callback_query.message.chat.id
if callback_query.from_user:
event_data.update(self._get_user_event_data(callback_query.from_user))
# Split data into command and args if possible
event_data.update(self._get_command_event_data(callback_query.data))
return event_type, event_data
def authorize_update(self, update: Update) -> bool:
"""Make sure either user or chat is in allowed_chat_ids."""
from_user = update.effective_user.id if update.effective_user else None
from_chat = update.effective_chat.id if update.effective_chat else None
allowed_chat_ids: list[int] = [
subentry.data[CONF_CHAT_ID] for subentry in self.config.subentries.values()
]
if from_user in allowed_chat_ids or from_chat in allowed_chat_ids:
return True
_LOGGER.error(
(
"Unauthorized update - neither user id %s nor chat id %s is in allowed"
" chats: %s"
),
from_user,
from_chat,
allowed_chat_ids,
)
return False
class TelegramNotificationService:
"""Implement the notification services for the Telegram Bot domain."""
def __init__(
self,
hass: HomeAssistant,
app: BaseTelegramBot,
bot: Bot,
config: TelegramBotConfigEntry,
parser: str,
) -> None:
"""Initialize the service."""
self.app = app
self.config = config
self._parsers = {
PARSER_HTML: ParseMode.HTML,
PARSER_MD: ParseMode.MARKDOWN,
PARSER_MD2: ParseMode.MARKDOWN_V2,
PARSER_PLAIN_TEXT: None,
}
self._parse_mode = self._parsers.get(parser)
self.bot = bot
self.hass = hass
def _get_allowed_chat_ids(self) -> list[int]:
allowed_chat_ids: list[int] = [
subentry.data[CONF_CHAT_ID] for subentry in self.config.subentries.values()
]
if not allowed_chat_ids:
bot_name: str = self.config.title
raise ServiceValidationError(
"No allowed chat IDs found for bot",
translation_domain=DOMAIN,
translation_key="missing_allowed_chat_ids",
translation_placeholders={
"bot_name": bot_name,
},
)
return allowed_chat_ids
def _get_last_message_id(self):
return dict.fromkeys(self._get_allowed_chat_ids())
def _get_msg_ids(self, msg_data, chat_id):
"""Get the message id to edit.
This can be one of (message_id, inline_message_id) from a msg dict,
returning a tuple.
**You can use 'last' as message_id** to edit
the message last sent in the chat_id.
"""
message_id = inline_message_id = None
if ATTR_MESSAGEID in msg_data:
message_id = msg_data[ATTR_MESSAGEID]
if (
isinstance(message_id, str)
and (message_id == "last")
and (self._get_last_message_id()[chat_id] is not None)
):
message_id = self._get_last_message_id()[chat_id]
else:
inline_message_id = msg_data["inline_message_id"]
return message_id, inline_message_id
def _get_target_chat_ids(self, target):
"""Validate chat_id targets or return default target (first).
:param target: optional list of integers ([12234, -12345])
:return list of chat_id targets (integers)
"""
allowed_chat_ids: list[int] = self._get_allowed_chat_ids()
default_user: int = allowed_chat_ids[0]
if target is not None:
if isinstance(target, int):
target = [target]
chat_ids = [t for t in target if t in allowed_chat_ids]
if chat_ids:
return chat_ids
_LOGGER.warning(
"Disallowed targets: %s, using default: %s", target, default_user
)
return [default_user]
def _get_msg_kwargs(self, data):
"""Get parameters in message data kwargs."""
def _make_row_inline_keyboard(row_keyboard):
"""Make a list of InlineKeyboardButtons.
It can accept:
- a list of tuples like:
`[(text_b1, data_callback_b1),
(text_b2, data_callback_b2), ...]
- a string like: `/cmd1, /cmd2, /cmd3`
- or a string like: `text_b1:/cmd1, text_b2:/cmd2`
- also supports urls instead of callback commands
"""
buttons = []
if isinstance(row_keyboard, str):
for key in row_keyboard.split(","):
if ":/" in key:
# check if command or URL
if key.startswith("https://"):
label = key.split(",")[0]
url = key[len(label) + 1 :]
buttons.append(InlineKeyboardButton(label, url=url))
else:
# commands like: 'Label:/cmd' become ('Label', '/cmd')
label = key.split(":/")[0]
command = key[len(label) + 1 :]
buttons.append(
InlineKeyboardButton(label, callback_data=command)
)
else:
# commands like: '/cmd' become ('CMD', '/cmd')
label = key.strip()[1:].upper()
buttons.append(InlineKeyboardButton(label, callback_data=key))
elif isinstance(row_keyboard, list):
for entry in row_keyboard:
text_btn, data_btn = entry
if data_btn.startswith("https://"):
buttons.append(InlineKeyboardButton(text_btn, url=data_btn))
else:
buttons.append(
InlineKeyboardButton(text_btn, callback_data=data_btn)
)
else:
raise TypeError(str(row_keyboard))
return buttons
# Defaults
params = {
ATTR_PARSER: self._parse_mode,
ATTR_DISABLE_NOTIF: False,
ATTR_DISABLE_WEB_PREV: None,
ATTR_REPLY_TO_MSGID: None,
ATTR_REPLYMARKUP: None,
ATTR_TIMEOUT: None,
ATTR_MESSAGE_TAG: None,
ATTR_MESSAGE_THREAD_ID: None,
}
if data is not None:
if ATTR_PARSER in data:
params[ATTR_PARSER] = self._parsers.get(
data[ATTR_PARSER], self._parse_mode
)
if ATTR_TIMEOUT in data:
params[ATTR_TIMEOUT] = data[ATTR_TIMEOUT]
if ATTR_DISABLE_NOTIF in data:
params[ATTR_DISABLE_NOTIF] = data[ATTR_DISABLE_NOTIF]
if ATTR_DISABLE_WEB_PREV in data:
params[ATTR_DISABLE_WEB_PREV] = data[ATTR_DISABLE_WEB_PREV]
if ATTR_REPLY_TO_MSGID in data:
params[ATTR_REPLY_TO_MSGID] = data[ATTR_REPLY_TO_MSGID]
if ATTR_MESSAGE_TAG in data:
params[ATTR_MESSAGE_TAG] = data[ATTR_MESSAGE_TAG]
if ATTR_MESSAGE_THREAD_ID in data:
params[ATTR_MESSAGE_THREAD_ID] = data[ATTR_MESSAGE_THREAD_ID]
# Keyboards:
if ATTR_KEYBOARD in data:
keys = data.get(ATTR_KEYBOARD)
keys = keys if isinstance(keys, list) else [keys]
if keys:
params[ATTR_REPLYMARKUP] = ReplyKeyboardMarkup(
[[key.strip() for key in row.split(",")] for row in keys],
resize_keyboard=data.get(ATTR_RESIZE_KEYBOARD, False),
one_time_keyboard=data.get(ATTR_ONE_TIME_KEYBOARD, False),
)
else:
params[ATTR_REPLYMARKUP] = ReplyKeyboardRemove(True)
elif ATTR_KEYBOARD_INLINE in data:
keys = data.get(ATTR_KEYBOARD_INLINE)
keys = keys if isinstance(keys, list) else [keys]
params[ATTR_REPLYMARKUP] = InlineKeyboardMarkup(
[_make_row_inline_keyboard(row) for row in keys]
)
return params
async def _send_msg(
self, func_send, msg_error, message_tag, *args_msg, context=None, **kwargs_msg
):
"""Send one message."""
try:
out = await func_send(*args_msg, **kwargs_msg)
if not isinstance(out, bool) and hasattr(out, ATTR_MESSAGEID):
chat_id = out.chat_id
message_id = out[ATTR_MESSAGEID]
self._get_last_message_id()[chat_id] = message_id
_LOGGER.debug(
"Last message ID: %s (from chat_id %s)",
self._get_last_message_id(),
chat_id,
)
event_data = {
ATTR_CHAT_ID: chat_id,
ATTR_MESSAGEID: message_id,
}
if message_tag is not None:
event_data[ATTR_MESSAGE_TAG] = message_tag
if kwargs_msg.get(ATTR_MESSAGE_THREAD_ID) is not None:
event_data[ATTR_MESSAGE_THREAD_ID] = kwargs_msg[
ATTR_MESSAGE_THREAD_ID
]
self.hass.bus.async_fire(
EVENT_TELEGRAM_SENT, event_data, context=context
)
elif not isinstance(out, bool):
_LOGGER.warning(
"Update last message: out_type:%s, out=%s", type(out), out
)
except TelegramError as exc:
_LOGGER.error(
"%s: %s. Args: %s, kwargs: %s", msg_error, exc, args_msg, kwargs_msg
)
return None
return out
async def send_message(self, message="", target=None, context=None, **kwargs):
"""Send a message to one or multiple pre-allowed chat IDs."""
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
params = self._get_msg_kwargs(kwargs)
msg_ids = {}
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Send message in chat ID %s with params: %s", chat_id, params)
msg = await self._send_msg(
self.bot.send_message,
"Error sending message",
params[ATTR_MESSAGE_TAG],
chat_id,
text,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
if msg is not None:
msg_ids[chat_id] = msg.id
return msg_ids
async def delete_message(self, chat_id=None, context=None, **kwargs):
"""Delete a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, _ = self._get_msg_ids(kwargs, chat_id)
_LOGGER.debug("Delete message %s in chat ID %s", message_id, chat_id)
deleted = await self._send_msg(
self.bot.delete_message,
"Error deleting message",
None,
chat_id,
message_id,
context=context,
)
# reduce message_id anyway:
if self._get_last_message_id()[chat_id] is not None:
# change last msg_id for deque(n_msgs)?
self._get_last_message_id()[chat_id] -= 1
return deleted
async def edit_message(self, type_edit, chat_id=None, context=None, **kwargs):
"""Edit a previously sent message."""
chat_id = self._get_target_chat_ids(chat_id)[0]
message_id, inline_message_id = self._get_msg_ids(kwargs, chat_id)
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Edit message %s in chat ID %s with params: %s",
message_id or inline_message_id,
chat_id,
params,
)
if type_edit == SERVICE_EDIT_MESSAGE:
message = kwargs.get(ATTR_MESSAGE)
title = kwargs.get(ATTR_TITLE)
text = f"{title}\n{message}" if title else message
_LOGGER.debug("Editing message with ID %s", message_id or inline_message_id)
return await self._send_msg(
self.bot.edit_message_text,
"Error editing text message",
params[ATTR_MESSAGE_TAG],
text,
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
parse_mode=params[ATTR_PARSER],
disable_web_page_preview=params[ATTR_DISABLE_WEB_PREV],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
context=context,
)
if type_edit == SERVICE_EDIT_CAPTION:
return await self._send_msg(
self.bot.edit_message_caption,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
caption=kwargs.get(ATTR_CAPTION),
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
context=context,
)
return await self._send_msg(
self.bot.edit_message_reply_markup,
"Error editing message attributes",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
message_id=message_id,
inline_message_id=inline_message_id,
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
context=context,
)
async def answer_callback_query(
self, message, callback_query_id, show_alert=False, context=None, **kwargs
):
"""Answer a callback originated with a press in an inline keyboard."""
params = self._get_msg_kwargs(kwargs)
_LOGGER.debug(
"Answer callback query with callback ID %s: %s, alert: %s",
callback_query_id,
message,
show_alert,
)
await self._send_msg(
self.bot.answer_callback_query,
"Error sending answer callback query",
params[ATTR_MESSAGE_TAG],
callback_query_id,
text=message,
show_alert=show_alert,
read_timeout=params[ATTR_TIMEOUT],
context=context,
)
async def send_file(
self, file_type=SERVICE_SEND_PHOTO, target=None, context=None, **kwargs
):
"""Send a photo, sticker, video, or document."""
params = self._get_msg_kwargs(kwargs)
file_content = await load_data(
self.hass,
url=kwargs.get(ATTR_URL),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME),
password=kwargs.get(ATTR_PASSWORD),
authentication=kwargs.get(ATTR_AUTHENTICATION),
verify_ssl=(
get_default_context()
if kwargs.get(ATTR_VERIFY_SSL, False)
else get_default_no_verify_context()
),
)
msg_ids = {}
if file_content:
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Sending file to chat ID %s", chat_id)
if file_type == SERVICE_SEND_PHOTO:
msg = await self._send_msg(
self.bot.send_photo,
"Error sending photo",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
photo=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
elif file_type == SERVICE_SEND_STICKER:
msg = await self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=file_content,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
elif file_type == SERVICE_SEND_VIDEO:
msg = await self._send_msg(
self.bot.send_video,
"Error sending video",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
video=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
elif file_type == SERVICE_SEND_DOCUMENT:
msg = await self._send_msg(
self.bot.send_document,
"Error sending document",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
document=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
elif file_type == SERVICE_SEND_VOICE:
msg = await self._send_msg(
self.bot.send_voice,
"Error sending voice",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
voice=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
elif file_type == SERVICE_SEND_ANIMATION:
msg = await self._send_msg(
self.bot.send_animation,
"Error sending animation",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
animation=file_content,
caption=kwargs.get(ATTR_CAPTION),
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
parse_mode=params[ATTR_PARSER],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
msg_ids[chat_id] = msg.id
file_content.seek(0)
else:
_LOGGER.error("Can't send file with kwargs: %s", kwargs)
return msg_ids
async def send_sticker(self, target=None, context=None, **kwargs) -> dict:
"""Send a sticker from a telegram sticker pack."""
params = self._get_msg_kwargs(kwargs)
stickerid = kwargs.get(ATTR_STICKER_ID)
msg_ids = {}
if stickerid:
for chat_id in self._get_target_chat_ids(target):
msg = await self._send_msg(
self.bot.send_sticker,
"Error sending sticker",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
sticker=stickerid,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
reply_markup=params[ATTR_REPLYMARKUP],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
msg_ids[chat_id] = msg.id
return msg_ids
return await self.send_file(SERVICE_SEND_STICKER, target, **kwargs)
async def send_location(
self, latitude, longitude, target=None, context=None, **kwargs
):
"""Send a location."""
latitude = float(latitude)
longitude = float(longitude)
params = self._get_msg_kwargs(kwargs)
msg_ids = {}
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug(
"Send location %s/%s to chat ID %s", latitude, longitude, chat_id
)
msg = await self._send_msg(
self.bot.send_location,
"Error sending location",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
latitude=latitude,
longitude=longitude,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
msg_ids[chat_id] = msg.id
return msg_ids
async def send_poll(
self,
question,
options,
is_anonymous,
allows_multiple_answers,
target=None,
context=None,
**kwargs,
):
"""Send a poll."""
params = self._get_msg_kwargs(kwargs)
openperiod = kwargs.get(ATTR_OPEN_PERIOD)
msg_ids = {}
for chat_id in self._get_target_chat_ids(target):
_LOGGER.debug("Send poll '%s' to chat ID %s", question, chat_id)
msg = await self._send_msg(
self.bot.send_poll,
"Error sending poll",
params[ATTR_MESSAGE_TAG],
chat_id=chat_id,
question=question,
options=options,
is_anonymous=is_anonymous,
allows_multiple_answers=allows_multiple_answers,
open_period=openperiod,
disable_notification=params[ATTR_DISABLE_NOTIF],
reply_to_message_id=params[ATTR_REPLY_TO_MSGID],
read_timeout=params[ATTR_TIMEOUT],
message_thread_id=params[ATTR_MESSAGE_THREAD_ID],
context=context,
)
msg_ids[chat_id] = msg.id
return msg_ids
async def leave_chat(self, chat_id=None, context=None, **kwargs):
"""Remove bot from chat."""
chat_id = self._get_target_chat_ids(chat_id)[0]
_LOGGER.debug("Leave from chat ID %s", chat_id)
return await self._send_msg(
self.bot.leave_chat, "Error leaving chat", None, chat_id, context=context
)
def initialize_bot(hass: HomeAssistant, p_config: MappingProxyType[str, Any]) -> Bot:
"""Initialize telegram bot with proxy support."""
api_key: str = p_config[CONF_API_KEY]
proxy_url: str | None = p_config.get(CONF_PROXY_URL)
proxy_params: dict | None = p_config.get(CONF_PROXY_PARAMS)
if proxy_url is not None:
auth = None
if proxy_params is None:
# CONF_PROXY_PARAMS has been kept for backwards compatibility.
proxy_params = {}
elif "username" in proxy_params and "password" in proxy_params:
# Auth can actually be stuffed into the URL, but the docs have previously
# indicated to put them here.
auth = proxy_params.pop("username"), proxy_params.pop("password")
ir.create_issue(
hass,
DOMAIN,
"proxy_params_auth_deprecation",
breaks_in_ha_version="2024.10.0",
is_persistent=False,
is_fixable=False,
severity=ir.IssueSeverity.WARNING,
translation_placeholders={
"proxy_params": CONF_PROXY_PARAMS,
"proxy_url": CONF_PROXY_URL,
"telegram_bot": "Telegram bot",
},
translation_key="proxy_params_auth_deprecation",
learn_more_url="https://github.com/home-assistant/core/pull/112778",
)
else:
ir.create_issue(
hass,
DOMAIN,
"proxy_params_deprecation",
breaks_in_ha_version="2024.10.0",
is_persistent=False,
is_fixable=False,
severity=ir.IssueSeverity.WARNING,
translation_placeholders={
"proxy_params": CONF_PROXY_PARAMS,
"proxy_url": CONF_PROXY_URL,
"httpx": "httpx",
"telegram_bot": "Telegram bot",
},
translation_key="proxy_params_deprecation",
learn_more_url="https://github.com/home-assistant/core/pull/112778",
)
proxy = httpx.Proxy(proxy_url, auth=auth, **proxy_params)
request = HTTPXRequest(connection_pool_size=8, proxy=proxy)
else:
request = HTTPXRequest(connection_pool_size=8)
return Bot(token=api_key, request=request)
async def load_data(
hass: HomeAssistant,
url=None,
filepath=None,
username=None,
password=None,
authentication=None,
num_retries=5,
verify_ssl=None,
):
"""Load data into ByteIO/File container from a source."""
try:
if url is not None:
# Load data from URL
params: dict[str, Any] = {}
headers = {}
if authentication == HTTP_BEARER_AUTHENTICATION and password is not None:
headers = {"Authorization": f"Bearer {password}"}
elif username is not None and password is not None:
if authentication == HTTP_DIGEST_AUTHENTICATION:
params["auth"] = httpx.DigestAuth(username, password)
else:
params["auth"] = httpx.BasicAuth(username, password)
if verify_ssl is not None:
params["verify"] = verify_ssl
retry_num = 0
async with httpx.AsyncClient(
timeout=15, headers=headers, **params
) as client:
while retry_num < num_retries:
req = await client.get(url)
if req.status_code != 200:
_LOGGER.warning(
"Status code %s (retry #%s) loading %s",
req.status_code,
retry_num + 1,
url,
)
else:
data = io.BytesIO(req.content)
if data.read():
data.seek(0)
data.name = url
return data
_LOGGER.warning(
"Empty data (retry #%s) in %s)", retry_num + 1, url
)
retry_num += 1
if retry_num < num_retries:
await asyncio.sleep(
1
) # Add a sleep to allow other async operations to proceed
_LOGGER.warning(
"Can't load data in %s after %s retries", url, retry_num
)
elif filepath is not None:
if hass.config.is_allowed_path(filepath):
return await hass.async_add_executor_job(
_read_file_as_bytesio, filepath
)
_LOGGER.warning("'%s' are not secure to load data from!", filepath)
else:
_LOGGER.warning("Can't load data. No data found in params!")
except (OSError, TypeError) as error:
_LOGGER.error("Can't load data into ByteIO: %s", error)
return None
def _read_file_as_bytesio(file_path: str) -> io.BytesIO:
"""Read a file and return it as a BytesIO object."""
with open(file_path, "rb") as file:
data = io.BytesIO(file.read())
data.name = file_path
return data

View File

@ -1,6 +1,14 @@
"""Support for Telegram bot to send messages only."""
from telegram import Bot
async def async_setup_platform(hass, bot, config):
from homeassistant.core import HomeAssistant
from .bot import BaseTelegramBot, TelegramBotConfigEntry
async def async_setup_platform(
hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry
) -> type[BaseTelegramBot] | None:
"""Set up the Telegram broadcast platform."""
return True
return None

View File

@ -0,0 +1,620 @@
"""Config flow for Telegram Bot."""
from collections.abc import Mapping
from ipaddress import AddressValueError, IPv4Network
import logging
from types import MappingProxyType
from typing import Any
from telegram import Bot, ChatFullInfo
from telegram.error import BadRequest, InvalidToken, NetworkError
import voluptuous as vol
from homeassistant.config_entries import (
SOURCE_IMPORT,
SOURCE_RECONFIGURE,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryData,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL
from homeassistant.core import callback
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.network import NoURLAvailableError, get_url
from homeassistant.helpers.selector import (
SelectSelector,
SelectSelectorConfig,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from . import initialize_bot
from .bot import TelegramBotConfigEntry
from .const import (
ATTR_PARSER,
BOT_NAME,
CONF_ALLOWED_CHAT_IDS,
CONF_BOT_COUNT,
CONF_CHAT_ID,
CONF_PROXY_URL,
CONF_TRUSTED_NETWORKS,
DEFAULT_TRUSTED_NETWORKS,
DOMAIN,
ERROR_FIELD,
ERROR_MESSAGE,
ISSUE_DEPRECATED_YAML,
ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS,
ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR,
PARSER_HTML,
PARSER_MD,
PARSER_MD2,
PLATFORM_BROADCAST,
PLATFORM_POLLING,
PLATFORM_WEBHOOKS,
SUBENTRY_TYPE_ALLOWED_CHAT_IDS,
)
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(CONF_PLATFORM): SelectSelector(
SelectSelectorConfig(
options=[
PLATFORM_BROADCAST,
PLATFORM_POLLING,
PLATFORM_WEBHOOKS,
],
translation_key="platforms",
)
),
vol.Required(CONF_API_KEY): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD,
autocomplete="current-password",
)
),
vol.Optional(CONF_PROXY_URL): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.URL)
),
}
)
STEP_RECONFIGURE_USER_DATA_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(CONF_PLATFORM): SelectSelector(
SelectSelectorConfig(
options=[
PLATFORM_BROADCAST,
PLATFORM_POLLING,
PLATFORM_WEBHOOKS,
],
translation_key="platforms",
)
),
vol.Optional(CONF_PROXY_URL): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.URL)
),
}
)
STEP_REAUTH_DATA_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(CONF_API_KEY): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD,
autocomplete="current-password",
)
)
}
)
STEP_WEBHOOKS_DATA_SCHEMA: vol.Schema = vol.Schema(
{
vol.Optional(CONF_URL): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Required(CONF_TRUSTED_NETWORKS): vol.Coerce(str),
}
)
OPTIONS_SCHEMA: vol.Schema = vol.Schema(
{
vol.Required(
ATTR_PARSER,
): SelectSelector(
SelectSelectorConfig(
options=[PARSER_MD, PARSER_MD2, PARSER_HTML],
translation_key="parsers",
)
)
}
)
class OptionsFlowHandler(OptionsFlow):
"""Options flow for webhooks."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(data=user_input)
return self.async_show_form(
step_id="init",
data_schema=self.add_suggested_values_to_schema(
OPTIONS_SCHEMA,
self.config_entry.options,
),
)
class TelgramBotConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Telegram."""
VERSION = 1
@staticmethod
@callback
def async_get_options_flow(
config_entry: TelegramBotConfigEntry,
) -> OptionsFlowHandler:
"""Create the options flow."""
return OptionsFlowHandler()
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: TelegramBotConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return subentries supported by this integration."""
return {SUBENTRY_TYPE_ALLOWED_CHAT_IDS: AllowedChatIdsSubEntryFlowHandler}
def __init__(self) -> None:
"""Create instance of the config flow."""
super().__init__()
self._bot: Bot | None = None
self._bot_name = "Unknown bot"
# for passing data between steps
self._step_user_data: dict[str, Any] = {}
# triggered by async_setup() from __init__.py
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Handle import of config entry from configuration.yaml."""
telegram_bot: str = f"{import_data[CONF_PLATFORM]} Telegram bot"
bot_count: int = import_data[CONF_BOT_COUNT]
import_data[CONF_TRUSTED_NETWORKS] = ",".join(
import_data[CONF_TRUSTED_NETWORKS]
)
try:
config_flow_result: ConfigFlowResult = await self.async_step_user(
import_data
)
except AbortFlow:
# this happens if the config entry is already imported
self._create_issue(ISSUE_DEPRECATED_YAML, telegram_bot, bot_count)
raise
else:
errors: dict[str, str] | None = config_flow_result.get("errors")
if errors:
error: str = errors.get("base", "unknown")
self._create_issue(
error,
telegram_bot,
bot_count,
config_flow_result["description_placeholders"],
)
return self.async_abort(reason="import_failed")
subentries: list[ConfigSubentryData] = []
allowed_chat_ids: list[int] = import_data[CONF_ALLOWED_CHAT_IDS]
for chat_id in allowed_chat_ids:
chat_name: str = await _async_get_chat_name(self._bot, chat_id)
subentry: ConfigSubentryData = ConfigSubentryData(
data={CONF_CHAT_ID: chat_id},
subentry_type=CONF_ALLOWED_CHAT_IDS,
title=chat_name,
unique_id=str(chat_id),
)
subentries.append(subentry)
config_flow_result["subentries"] = subentries
self._create_issue(
ISSUE_DEPRECATED_YAML,
telegram_bot,
bot_count,
config_flow_result["description_placeholders"],
)
return config_flow_result
def _create_issue(
self,
issue: str,
telegram_bot_type: str,
bot_count: int,
description_placeholders: Mapping[str, str] | None = None,
) -> None:
translation_key: str = (
ISSUE_DEPRECATED_YAML
if bot_count == 1
else ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS
)
if issue != ISSUE_DEPRECATED_YAML:
translation_key = ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR
telegram_bot = (
description_placeholders.get(BOT_NAME, telegram_bot_type)
if description_placeholders
else telegram_bot_type
)
error_field = (
description_placeholders.get(ERROR_FIELD, "Unknown error")
if description_placeholders
else "Unknown error"
)
error_message = (
description_placeholders.get(ERROR_MESSAGE, "Unknown error")
if description_placeholders
else "Unknown error"
)
async_create_issue(
self.hass,
DOMAIN,
ISSUE_DEPRECATED_YAML,
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key=translation_key,
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Telegram Bot",
"telegram_bot": telegram_bot,
ERROR_FIELD: error_field,
ERROR_MESSAGE: error_message,
},
learn_more_url="https://github.com/home-assistant/core/pull/144617",
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow to create a new config entry for a Telegram bot."""
if not user_input:
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_DATA_SCHEMA,
)
# prevent duplicates
await self.async_set_unique_id(user_input[CONF_API_KEY])
self._abort_if_unique_id_configured()
# validate connection to Telegram API
errors: dict[str, str] = {}
description_placeholders: dict[str, str] = {}
bot_name = await self._validate_bot(
user_input, errors, description_placeholders
)
if errors:
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders=description_placeholders,
)
if user_input[CONF_PLATFORM] != PLATFORM_WEBHOOKS:
await self._shutdown_bot()
return self.async_create_entry(
title=bot_name,
data={
CONF_PLATFORM: user_input[CONF_PLATFORM],
CONF_API_KEY: user_input[CONF_API_KEY],
CONF_PROXY_URL: user_input.get(CONF_PROXY_URL),
},
options={
# this value may come from yaml import
ATTR_PARSER: user_input.get(ATTR_PARSER, PARSER_MD)
},
description_placeholders=description_placeholders,
)
self._bot_name = bot_name
self._step_user_data.update(user_input)
if self.source == SOURCE_IMPORT:
return await self.async_step_webhooks(
{
CONF_URL: user_input.get(CONF_URL),
CONF_TRUSTED_NETWORKS: user_input[CONF_TRUSTED_NETWORKS],
}
)
return await self.async_step_webhooks()
async def _shutdown_bot(self) -> None:
"""Shutdown the bot if it exists."""
if self._bot:
await self._bot.shutdown()
self._bot = None
async def _validate_bot(
self,
user_input: dict[str, Any],
errors: dict[str, str],
placeholders: dict[str, str],
) -> str:
try:
bot = await self.hass.async_add_executor_job(
initialize_bot, self.hass, MappingProxyType(user_input)
)
self._bot = bot
user = await bot.get_me()
except InvalidToken as err:
_LOGGER.warning("Invalid API token")
errors["base"] = "invalid_api_key"
placeholders[ERROR_FIELD] = "API key"
placeholders[ERROR_MESSAGE] = str(err)
return "Unknown bot"
except (ValueError, NetworkError) as err:
_LOGGER.warning("Invalid proxy")
errors["base"] = "invalid_proxy_url"
placeholders["proxy_url_error"] = str(err)
placeholders[ERROR_FIELD] = "proxy url"
placeholders[ERROR_MESSAGE] = str(err)
return "Unknown bot"
else:
return user.full_name
async def async_step_webhooks(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle config flow for webhook Telegram bot."""
if not user_input:
if self.source == SOURCE_RECONFIGURE:
return self.async_show_form(
step_id="webhooks",
data_schema=self.add_suggested_values_to_schema(
STEP_WEBHOOKS_DATA_SCHEMA,
self._get_reconfigure_entry().data,
),
)
return self.async_show_form(
step_id="webhooks",
data_schema=self.add_suggested_values_to_schema(
STEP_WEBHOOKS_DATA_SCHEMA,
{
CONF_TRUSTED_NETWORKS: ",".join(
[str(network) for network in DEFAULT_TRUSTED_NETWORKS]
),
},
),
)
errors: dict[str, str] = {}
description_placeholders: dict[str, str] = {BOT_NAME: self._bot_name}
self._validate_webhooks(user_input, errors, description_placeholders)
if errors:
return self.async_show_form(
step_id="webhooks",
data_schema=self.add_suggested_values_to_schema(
STEP_WEBHOOKS_DATA_SCHEMA,
user_input,
),
errors=errors,
description_placeholders=description_placeholders,
)
await self._shutdown_bot()
if self.source == SOURCE_RECONFIGURE:
user_input.update(self._step_user_data)
return self.async_update_reload_and_abort(
self._get_reconfigure_entry(),
title=self._bot_name,
data_updates=user_input,
)
return self.async_create_entry(
title=self._bot_name,
data={
CONF_PLATFORM: self._step_user_data[CONF_PLATFORM],
CONF_API_KEY: self._step_user_data[CONF_API_KEY],
CONF_PROXY_URL: self._step_user_data.get(CONF_PROXY_URL),
CONF_URL: user_input.get(CONF_URL),
CONF_TRUSTED_NETWORKS: user_input[CONF_TRUSTED_NETWORKS],
},
options={ATTR_PARSER: self._step_user_data.get(ATTR_PARSER, PARSER_MD)},
description_placeholders=description_placeholders,
)
def _validate_webhooks(
self,
user_input: dict[str, Any],
errors: dict[str, str],
description_placeholders: dict[str, str],
) -> None:
# validate URL
if CONF_URL in user_input and not user_input[CONF_URL].startswith("https"):
errors["base"] = "invalid_url"
description_placeholders[ERROR_FIELD] = "URL"
description_placeholders[ERROR_MESSAGE] = "URL must start with https"
return
if CONF_URL not in user_input:
try:
get_url(self.hass, require_ssl=True, allow_internal=False)
except NoURLAvailableError:
errors["base"] = "no_url_available"
description_placeholders[ERROR_FIELD] = "URL"
description_placeholders[ERROR_MESSAGE] = (
"URL is required since you have not configured an external URL in Home Assistant"
)
return
# validate trusted networks
csv_trusted_networks: list[str] = []
formatted_trusted_networks: str = (
user_input[CONF_TRUSTED_NETWORKS].lstrip("[").rstrip("]")
)
for trusted_network in cv.ensure_list_csv(formatted_trusted_networks):
formatted_trusted_network: str = trusted_network.strip("'")
try:
IPv4Network(formatted_trusted_network)
except (AddressValueError, ValueError) as err:
errors["base"] = "invalid_trusted_networks"
description_placeholders[ERROR_FIELD] = "trusted networks"
description_placeholders[ERROR_MESSAGE] = str(err)
return
else:
csv_trusted_networks.append(formatted_trusted_network)
user_input[CONF_TRUSTED_NETWORKS] = csv_trusted_networks
return
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Reconfigure Telegram bot."""
api_key: str = self._get_reconfigure_entry().data[CONF_API_KEY]
await self.async_set_unique_id(api_key)
self._abort_if_unique_id_mismatch()
if not user_input:
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
STEP_RECONFIGURE_USER_DATA_SCHEMA,
self._get_reconfigure_entry().data,
),
)
errors: dict[str, str] = {}
description_placeholders: dict[str, str] = {}
user_input[CONF_API_KEY] = api_key
bot_name = await self._validate_bot(
user_input, errors, description_placeholders
)
self._bot_name = bot_name
if errors:
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
STEP_RECONFIGURE_USER_DATA_SCHEMA,
user_input,
),
errors=errors,
description_placeholders=description_placeholders,
)
if user_input[CONF_PLATFORM] != PLATFORM_WEBHOOKS:
await self._shutdown_bot()
return self.async_update_reload_and_abort(
self._get_reconfigure_entry(), title=bot_name, data_updates=user_input
)
self._step_user_data.update(user_input)
return await self.async_step_webhooks()
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Reauth step."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Reauth confirm step."""
if user_input is None:
return self.async_show_form(
step_id="reauth_confirm",
data_schema=self.add_suggested_values_to_schema(
STEP_REAUTH_DATA_SCHEMA, self._get_reauth_entry().data
),
)
errors: dict[str, str] = {}
description_placeholders: dict[str, str] = {}
bot_name = await self._validate_bot(
user_input, errors, description_placeholders
)
await self._shutdown_bot()
if errors:
return self.async_show_form(
step_id="reauth_confirm",
data_schema=self.add_suggested_values_to_schema(
STEP_REAUTH_DATA_SCHEMA, self._get_reauth_entry().data
),
errors=errors,
description_placeholders=description_placeholders,
)
return self.async_update_reload_and_abort(
self._get_reauth_entry(), title=bot_name, data_updates=user_input
)
class AllowedChatIdsSubEntryFlowHandler(ConfigSubentryFlow):
"""Handle a subentry flow for creating chat ID."""
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""Create allowed chat ID."""
errors: dict[str, str] = {}
if user_input is not None:
config_entry: TelegramBotConfigEntry = self._get_entry()
bot = config_entry.runtime_data.bot
chat_id: int = user_input[CONF_CHAT_ID]
chat_name = await _async_get_chat_name(bot, chat_id)
if chat_name:
return self.async_create_entry(
title=chat_name,
data={CONF_CHAT_ID: chat_id},
unique_id=str(chat_id),
)
errors["base"] = "chat_not_found"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_CHAT_ID): vol.Coerce(int)}),
errors=errors,
)
async def _async_get_chat_name(bot: Bot | None, chat_id: int) -> str:
if not bot:
return str(chat_id)
try:
chat_info: ChatFullInfo = await bot.get_chat(chat_id)
return chat_info.effective_name or str(chat_id)
except BadRequest:
return ""

View File

@ -0,0 +1,109 @@
"""Constants for the Telegram Bot integration."""
from ipaddress import ip_network
DOMAIN = "telegram_bot"
PLATFORM_BROADCAST = "broadcast"
PLATFORM_POLLING = "polling"
PLATFORM_WEBHOOKS = "webhooks"
SUBENTRY_TYPE_ALLOWED_CHAT_IDS = "allowed_chat_ids"
CONF_BOT_COUNT = "bot_count"
CONF_ALLOWED_CHAT_IDS = "allowed_chat_ids"
CONF_CONFIG_ENTRY_ID = "config_entry_id"
CONF_PROXY_PARAMS = "proxy_params"
CONF_PROXY_URL = "proxy_url"
CONF_TRUSTED_NETWORKS = "trusted_networks"
# subentry
CONF_CHAT_ID = "chat_id"
BOT_NAME = "telegram_bot"
ERROR_FIELD = "error_field"
ERROR_MESSAGE = "error_message"
ISSUE_DEPRECATED_YAML = "deprecated_yaml"
ISSUE_DEPRECATED_YAML_HAS_MORE_PLATFORMS = (
"deprecated_yaml_import_issue_has_more_platforms"
)
ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR = "deprecated_yaml_import_issue_error"
DEFAULT_TRUSTED_NETWORKS = [ip_network("149.154.160.0/20"), ip_network("91.108.4.0/22")]
SERVICE_SEND_MESSAGE = "send_message"
SERVICE_SEND_PHOTO = "send_photo"
SERVICE_SEND_STICKER = "send_sticker"
SERVICE_SEND_ANIMATION = "send_animation"
SERVICE_SEND_VIDEO = "send_video"
SERVICE_SEND_VOICE = "send_voice"
SERVICE_SEND_DOCUMENT = "send_document"
SERVICE_SEND_LOCATION = "send_location"
SERVICE_SEND_POLL = "send_poll"
SERVICE_EDIT_MESSAGE = "edit_message"
SERVICE_EDIT_CAPTION = "edit_caption"
SERVICE_EDIT_REPLYMARKUP = "edit_replymarkup"
SERVICE_ANSWER_CALLBACK_QUERY = "answer_callback_query"
SERVICE_DELETE_MESSAGE = "delete_message"
SERVICE_LEAVE_CHAT = "leave_chat"
EVENT_TELEGRAM_CALLBACK = "telegram_callback"
EVENT_TELEGRAM_COMMAND = "telegram_command"
EVENT_TELEGRAM_TEXT = "telegram_text"
EVENT_TELEGRAM_SENT = "telegram_sent"
PARSER_HTML = "html"
PARSER_MD = "markdown"
PARSER_MD2 = "markdownv2"
PARSER_PLAIN_TEXT = "plain_text"
ATTR_DATA = "data"
ATTR_MESSAGE = "message"
ATTR_TITLE = "title"
ATTR_ARGS = "args"
ATTR_AUTHENTICATION = "authentication"
ATTR_CALLBACK_QUERY = "callback_query"
ATTR_CALLBACK_QUERY_ID = "callback_query_id"
ATTR_CAPTION = "caption"
ATTR_CHAT_ID = "chat_id"
ATTR_CHAT_INSTANCE = "chat_instance"
ATTR_DATE = "date"
ATTR_DISABLE_NOTIF = "disable_notification"
ATTR_DISABLE_WEB_PREV = "disable_web_page_preview"
ATTR_EDITED_MSG = "edited_message"
ATTR_FILE = "file"
ATTR_FROM_FIRST = "from_first"
ATTR_FROM_LAST = "from_last"
ATTR_KEYBOARD = "keyboard"
ATTR_RESIZE_KEYBOARD = "resize_keyboard"
ATTR_ONE_TIME_KEYBOARD = "one_time_keyboard"
ATTR_KEYBOARD_INLINE = "inline_keyboard"
ATTR_MESSAGEID = "message_id"
ATTR_MSG = "message"
ATTR_MSGID = "id"
ATTR_PARSER = "parse_mode"
ATTR_PASSWORD = "password"
ATTR_REPLY_TO_MSGID = "reply_to_message_id"
ATTR_REPLYMARKUP = "reply_markup"
ATTR_SHOW_ALERT = "show_alert"
ATTR_STICKER_ID = "sticker_id"
ATTR_TARGET = "target"
ATTR_TEXT = "text"
ATTR_URL = "url"
ATTR_USER_ID = "user_id"
ATTR_USERNAME = "username"
ATTR_VERIFY_SSL = "verify_ssl"
ATTR_TIMEOUT = "timeout"
ATTR_MESSAGE_TAG = "message_tag"
ATTR_CHANNEL_POST = "channel_post"
ATTR_QUESTION = "question"
ATTR_OPTIONS = "options"
ATTR_ANSWERS = "answers"
ATTR_OPEN_PERIOD = "open_period"
ATTR_IS_ANONYMOUS = "is_anonymous"
ATTR_ALLOWS_MULTIPLE_ANSWERS = "allows_multiple_answers"
ATTR_MESSAGE_THREAD_ID = "message_thread_id"

View File

@ -2,6 +2,7 @@
"domain": "telegram_bot",
"name": "Telegram bot",
"codeowners": [],
"config_flow": true,
"dependencies": ["http"],
"documentation": "https://www.home-assistant.io/integrations/telegram_bot",
"iot_class": "cloud_push",

View File

@ -2,34 +2,35 @@
import logging
from telegram import Update
from telegram import Bot, Update
from telegram.error import NetworkError, RetryAfter, TelegramError, TimedOut
from telegram.ext import ApplicationBuilder, CallbackContext, TypeHandler
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from . import BaseTelegramBotEntity
from .bot import BaseTelegramBot, TelegramBotConfigEntry
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, bot, config):
async def async_setup_platform(
hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry
) -> BaseTelegramBot | None:
"""Set up the Telegram polling platform."""
pollbot = PollBot(hass, bot, config)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, pollbot.start_polling)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pollbot.stop_polling)
config.async_create_task(hass, pollbot.start_polling(), "polling telegram bot")
return True
return pollbot
async def process_error(update: Update, context: CallbackContext) -> None:
async def process_error(update: object, context: CallbackContext) -> None:
"""Telegram bot error handler."""
if context.error:
error_callback(context.error, update)
def error_callback(error: Exception, update: Update | None = None) -> None:
def error_callback(error: Exception, update: object | None = None) -> None:
"""Log the error."""
try:
raise error
@ -43,13 +44,15 @@ def error_callback(error: Exception, update: Update | None = None) -> None:
_LOGGER.error("%s: %s", error.__class__.__name__, error)
class PollBot(BaseTelegramBotEntity):
class PollBot(BaseTelegramBot):
"""Controls the Application object that holds the bot and an updater.
The application is set up to pass telegram updates to `self.handle_update`
"""
def __init__(self, hass, bot, config):
def __init__(
self, hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry
) -> None:
"""Create Application to poll for updates."""
super().__init__(hass, config)
self.bot = bot
@ -57,6 +60,10 @@ class PollBot(BaseTelegramBotEntity):
self.application.add_handler(TypeHandler(Update, self.handle_update))
self.application.add_error_handler(process_error)
async def shutdown(self) -> None:
"""Shutdown the app."""
await self.stop_polling()
async def start_polling(self, event=None):
"""Start the polling task."""
_LOGGER.debug("Starting polling")

View File

@ -2,6 +2,10 @@
send_message:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message:
required: true
example: The garage door has been open for 10 minutes.
@ -61,6 +65,10 @@ send_message:
send_photo:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/image.png"
selector:
@ -137,6 +145,10 @@ send_photo:
send_sticker:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/sticker.webp"
selector:
@ -205,6 +217,10 @@ send_sticker:
send_animation:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/animation.gif"
selector:
@ -281,6 +297,10 @@ send_animation:
send_video:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/video.mp4"
selector:
@ -357,6 +377,10 @@ send_video:
send_voice:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/voice.opus"
selector:
@ -425,6 +449,10 @@ send_voice:
send_document:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
url:
example: "http://example.org/path/to/the/document.odf"
selector:
@ -501,6 +529,10 @@ send_document:
send_location:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
latitude:
required: true
selector:
@ -555,6 +587,10 @@ send_location:
send_poll:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
target:
example: "[12345, 67890] or 12345"
selector:
@ -603,6 +639,10 @@ send_poll:
edit_message:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message_id:
required: true
example: "{{ trigger.event.data.message.message_id }}"
@ -641,6 +681,10 @@ edit_message:
edit_caption:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message_id:
required: true
example: "{{ trigger.event.data.message.message_id }}"
@ -665,6 +709,10 @@ edit_caption:
edit_replymarkup:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message_id:
required: true
example: "{{ trigger.event.data.message.message_id }}"
@ -685,6 +733,10 @@ edit_replymarkup:
answer_callback_query:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message:
required: true
example: "OK, I'm listening"
@ -708,6 +760,10 @@ answer_callback_query:
delete_message:
fields:
config_entry_id:
selector:
config_entry:
integration: telegram_bot
message_id:
required: true
example: "{{ trigger.event.data.message.message_id }}"

View File

@ -1,9 +1,128 @@
{
"config": {
"step": {
"user": {
"title": "Telegram bot setup",
"description": "Create a new Telegram bot",
"data": {
"platform": "Platform",
"api_key": "[%key:common::config_flow::data::api_key%]",
"proxy_url": "Proxy URL"
},
"data_description": {
"platform": "Telegram bot implementation",
"api_key": "The API token of your bot.",
"proxy_url": "Proxy URL if working behind one, optionally including username and password.\n(socks5://username:password@proxy_ip:proxy_port)"
}
},
"webhooks": {
"title": "Webhooks network configuration",
"data": {
"url": "[%key:common::config_flow::data::url%]",
"trusted_networks": "Trusted networks"
},
"data_description": {
"url": "Allow to overwrite the external URL from the Home Assistant configuration for different setups.",
"trusted_networks": "Telegram server access ACL as list.\nDefault: 149.154.160.0/20, 91.108.4.0/22"
}
},
"reconfigure": {
"title": "Telegram bot setup",
"description": "Reconfigure Telegram bot",
"data": {
"platform": "[%key:component::telegram_bot::config::step::user::data::platform%]",
"proxy_url": "[%key:component::telegram_bot::config::step::user::data::proxy_url%]"
},
"data_description": {
"platform": "[%key:component::telegram_bot::config::step::user::data_description::platform%]",
"proxy_url": "[%key:component::telegram_bot::config::step::user::data_description::proxy_url%]"
}
},
"reauth_confirm": {
"title": "Re-authenticate Telegram bot",
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
},
"data_description": {
"api_key": "[%key:component::telegram_bot::config::step::user::data_description::api_key%]"
}
}
},
"error": {
"invalid_api_key": "[%key:common::config_flow::error::invalid_api_key%]",
"invalid_proxy_url": "{proxy_url_error}",
"no_url_available": "URL is required since you have not configured an external URL in Home Assistant",
"invalid_url": "URL must start with https",
"invalid_trusted_networks": "Invalid trusted network: {error_message}"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
},
"options": {
"step": {
"init": {
"title": "Configure Telegram bot",
"data": {
"parse_mode": "Parse mode"
},
"data_description": {
"parse_mode": "Default parse mode for messages if not explicit in message data."
}
}
}
},
"config_subentries": {
"allowed_chat_ids": {
"initiate_flow": {
"user": "Add allowed chat ID"
},
"step": {
"user": {
"title": "Add chat",
"data": {
"chat_id": "Chat ID"
},
"data_description": {
"chat_id": "ID representing the user or group chat to which messages can be sent."
}
}
},
"error": {
"chat_not_found": "Chat not found"
},
"abort": {
"already_configured": "Chat already configured"
}
}
},
"selector": {
"platforms": {
"options": {
"broadcast": "Broadcast",
"polling": "Polling",
"webhooks": "Webhooks"
}
},
"parsers": {
"options": {
"markdown": "Markdown (Legacy)",
"markdownv2": "MarkdownV2",
"html": "HTML"
}
}
},
"services": {
"send_message": {
"name": "Send message",
"description": "Sends a notification.",
"fields": {
"config_entry_id": {
"name": "Config entry ID",
"description": "The config entry representing the Telegram bot to send the message."
},
"message": {
"name": "Message",
"description": "Message body of the notification."
@ -58,6 +177,10 @@
"name": "Send photo",
"description": "Sends a photo.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the photo."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to an image."
@ -128,6 +251,10 @@
"name": "Send sticker",
"description": "Sends a sticker.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the sticker."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to a static .webp or animated .tgs sticker."
@ -194,6 +321,10 @@
"name": "Send animation",
"description": "Sends an animation.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the animation."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to a GIF or H.264/MPEG-4 AVC video without sound."
@ -264,6 +395,10 @@
"name": "Send video",
"description": "Sends a video.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the video."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to a video."
@ -334,6 +469,10 @@
"name": "Send voice",
"description": "Sends a voice message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the voice message."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to a voice message."
@ -400,6 +539,10 @@
"name": "Send document",
"description": "Sends a document.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the document."
},
"url": {
"name": "[%key:common::config_flow::data::url%]",
"description": "Remote path to a document."
@ -470,6 +613,10 @@
"name": "Send location",
"description": "Sends a location.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the location."
},
"latitude": {
"name": "[%key:common::config_flow::data::latitude%]",
"description": "The latitude to send."
@ -516,6 +663,10 @@
"name": "Send poll",
"description": "Sends a poll.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to send the poll."
},
"target": {
"name": "Target",
"description": "[%key:component::telegram_bot::services::send_location::fields::target::description%]"
@ -566,6 +717,10 @@
"name": "Edit message",
"description": "Edits a previously sent message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to edit the message."
},
"message_id": {
"name": "Message ID",
"description": "ID of the message to edit."
@ -600,6 +755,10 @@
"name": "Edit caption",
"description": "Edits the caption of a previously sent message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to edit the caption."
},
"message_id": {
"name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]",
"description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]"
@ -622,6 +781,10 @@
"name": "Edit reply markup",
"description": "Edits the inline keyboard of a previously sent message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to edit the reply markup."
},
"message_id": {
"name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]",
"description": "[%key:component::telegram_bot::services::edit_message::fields::message_id::description%]"
@ -640,6 +803,10 @@
"name": "Answer callback query",
"description": "Responds to a callback query originated by clicking on an online keyboard button. The answer will be displayed to the user as a notification at the top of the chat screen or as an alert.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to answer the callback query."
},
"message": {
"name": "Message",
"description": "Unformatted text message body of the notification."
@ -662,6 +829,10 @@
"name": "Delete message",
"description": "Deletes a previously sent message.",
"fields": {
"config_entry_id": {
"name": "[%key:component::telegram_bot::services::send_message::fields::config_entry_id::name%]",
"description": "The config entry representing the Telegram bot to delete the message."
},
"message_id": {
"name": "[%key:component::telegram_bot::services::edit_message::fields::message_id::name%]",
"description": "ID of the message to delete."
@ -673,7 +844,30 @@
}
}
},
"exceptions": {
"multiple_config_entry": {
"message": "Multiple config entries found. Please specify the Telegram bot to use in the Config entry ID field."
},
"missing_config_entry": {
"message": "No config entries found or setup failed. Please set up the Telegram Bot first."
},
"missing_allowed_chat_ids": {
"message": "No allowed chat IDs found. Please add allowed chat IDs for {bot_name}."
}
},
"issues": {
"deprecated_yaml": {
"title": "The {integration_title} YAML configuration is being removed",
"description": "Configuring {integration_title} using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the `{domain}` configuration from your configuration.yaml file and restart Home Assistant to fix this issue."
},
"deprecated_yaml_import_issue_has_more_platforms": {
"title": "The {integration_title} YAML configuration is being removed",
"description": "Configuring {integration_title} using YAML is being removed.\n\nThe last entry of your existing YAML configuration ({telegram_bot}) has been imported into the UI automatically.\n\nRemove the `{domain}` configuration from your configuration.yaml file and restart Home Assistant to fix this issue. The other Telegram bots will need to be configured manually in the UI."
},
"deprecated_yaml_import_issue_error": {
"title": "YAML import failed due to invalid {error_field}",
"description": "Configuring {integration_title} using YAML is being removed but there was an error while importing your existing configuration ({telegram_bot}): {error_message}.\nSetup will not proceed.\n\nVerify that your {telegram_bot} is operating correctly and restart Home Assistant to attempt the import again.\n\nAlternatively, you may remove the `{domain}` configuration from your configuration.yaml entirely, restart Home Assistant, and add the {integration_title} integration manually."
},
"proxy_params_auth_deprecation": {
"title": "{telegram_bot}: Proxy authentication should be moved to the URL",
"description": "Authentication details for the the proxy configured in the {telegram_bot} integration should be moved into the {proxy_url} instead. Please update your configuration and restart Home Assistant to fix this issue.\n\nThe {proxy_params} config key will be removed in a future release."

View File

@ -2,20 +2,23 @@
import datetime as dt
from http import HTTPStatus
from ipaddress import ip_address
from ipaddress import IPv4Network, ip_address
import logging
import secrets
import string
from telegram import Update
from telegram.error import TimedOut
from telegram.ext import Application, TypeHandler
from telegram import Bot, Update
from telegram.error import NetworkError, TimedOut
from telegram.ext import ApplicationBuilder, TypeHandler
from homeassistant.components.http import HomeAssistantView
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.network import get_url
from . import CONF_TRUSTED_NETWORKS, CONF_URL, BaseTelegramBotEntity
from .bot import BaseTelegramBot, TelegramBotConfigEntry
from .const import CONF_TRUSTED_NETWORKS
_LOGGER = logging.getLogger(__name__)
@ -24,7 +27,9 @@ REMOVE_WEBHOOK_URL = ""
SECRET_TOKEN_LENGTH = 32
async def async_setup_platform(hass, bot, config):
async def async_setup_platform(
hass: HomeAssistant, bot: Bot, config: TelegramBotConfigEntry
) -> BaseTelegramBot | None:
"""Set up the Telegram webhooks platform."""
# Generate an ephemeral secret token
@ -33,46 +38,56 @@ async def async_setup_platform(hass, bot, config):
pushbot = PushBot(hass, bot, config, secret_token)
if not pushbot.webhook_url.startswith("https"):
_LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url)
return False
await pushbot.start_application()
webhook_registered = await pushbot.register_webhook()
if not webhook_registered:
return False
raise ConfigEntryNotReady("Failed to register webhook with Telegram")
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.stop_application)
hass.http.register_view(
PushBotView(
hass,
bot,
pushbot.application,
config[CONF_TRUSTED_NETWORKS],
_get_trusted_networks(config),
secret_token,
)
)
return True
return pushbot
class PushBot(BaseTelegramBotEntity):
def _get_trusted_networks(config: TelegramBotConfigEntry) -> list[IPv4Network]:
trusted_networks_str: list[str] = config.data[CONF_TRUSTED_NETWORKS]
return [IPv4Network(trusted_network) for trusted_network in trusted_networks_str]
class PushBot(BaseTelegramBot):
"""Handles all the push/webhook logic and passes telegram updates to `self.handle_update`."""
def __init__(self, hass, bot, config, secret_token):
def __init__(
self,
hass: HomeAssistant,
bot: Bot,
config: TelegramBotConfigEntry,
secret_token: str,
) -> None:
"""Create Application before calling super()."""
self.bot = bot
self.trusted_networks = config[CONF_TRUSTED_NETWORKS]
self.trusted_networks = _get_trusted_networks(config)
self.secret_token = secret_token
# Dumb Application that just gets our updates to our handler callback (self.handle_update)
self.application = Application.builder().bot(bot).updater(None).build()
self.application = ApplicationBuilder().bot(bot).updater(None).build()
self.application.add_handler(TypeHandler(Update, self.handle_update))
super().__init__(hass, config)
self.base_url = config.get(CONF_URL) or get_url(
self.base_url = config.data.get(CONF_URL) or get_url(
hass, require_ssl=True, allow_internal=False
)
self.webhook_url = f"{self.base_url}{TELEGRAM_WEBHOOK_URL}"
async def shutdown(self) -> None:
"""Shutdown the app."""
await self.stop_application()
async def _try_to_set_webhook(self):
_LOGGER.debug("Registering webhook URL: %s", self.webhook_url)
retry_num = 0
@ -127,7 +142,10 @@ class PushBot(BaseTelegramBotEntity):
async def deregister_webhook(self):
"""Query telegram and deregister the URL for our webhook."""
_LOGGER.debug("Deregistering webhook URL")
await self.bot.delete_webhook()
try:
await self.bot.delete_webhook()
except NetworkError:
_LOGGER.error("Failed to deregister webhook URL")
class PushBotView(HomeAssistantView):
@ -137,7 +155,14 @@ class PushBotView(HomeAssistantView):
url = TELEGRAM_WEBHOOK_URL
name = "telegram_webhooks"
def __init__(self, hass, bot, application, trusted_networks, secret_token):
def __init__(
self,
hass: HomeAssistant,
bot: Bot,
application,
trusted_networks: list[IPv4Network],
secret_token: str,
) -> None:
"""Initialize by storing stuff needed for setting up our webhook endpoint."""
self.hass = hass
self.bot = bot

View File

@ -634,6 +634,7 @@ FLOWS = {
"tautulli",
"technove",
"tedee",
"telegram_bot",
"tellduslive",
"tesla_fleet",
"tesla_wall_connector",

View File

@ -6578,7 +6578,7 @@
},
"telegram_bot": {
"integration_type": "hub",
"config_flow": false,
"config_flow": true,
"iot_class": "cloud_push",
"name": "Telegram bot"
}

View File

@ -3,26 +3,31 @@
from collections.abc import AsyncGenerator, Generator
from datetime import datetime
from typing import Any
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
import pytest
from telegram import Bot, Chat, Message, User
from telegram.constants import ChatType
from telegram import Bot, Chat, ChatFullInfo, Message, User
from telegram.constants import AccentColor, ChatType
from homeassistant.components.telegram_bot import (
ATTR_PARSER,
CONF_ALLOWED_CHAT_IDS,
CONF_TRUSTED_NETWORKS,
DOMAIN,
PARSER_MD,
)
from homeassistant.const import (
CONF_API_KEY,
CONF_PLATFORM,
CONF_URL,
EVENT_HOMEASSISTANT_START,
from homeassistant.components.telegram_bot.const import (
CONF_CHAT_ID,
PLATFORM_BROADCAST,
PLATFORM_WEBHOOKS,
)
from homeassistant.config_entries import ConfigSubentryData
from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
@pytest.fixture
def config_webhooks() -> dict[str, Any]:
@ -30,7 +35,7 @@ def config_webhooks() -> dict[str, Any]:
return {
DOMAIN: [
{
CONF_PLATFORM: "webhooks",
CONF_PLATFORM: PLATFORM_WEBHOOKS,
CONF_URL: "https://test",
CONF_TRUSTED_NETWORKS: ["127.0.0.1"],
CONF_API_KEY: "1234567890:ABC",
@ -83,6 +88,14 @@ def mock_register_webhook() -> Generator[None]:
@pytest.fixture
def mock_external_calls() -> Generator[None]:
"""Mock calls that make calls to the live Telegram API."""
test_chat = ChatFullInfo(
id=123456,
title="mock title",
first_name="mock first_name",
type="PRIVATE",
max_reaction_count=100,
accent_color_id=AccentColor.COLOR_000,
)
test_user = User(123456, "Testbot", True)
message = Message(
message_id=12345,
@ -100,8 +113,12 @@ def mock_external_calls() -> Generator[None]:
super().__init__(*args, **kwargs)
self._bot_user = test_user
async def delete_webhook(self) -> bool:
return True
with (
patch("homeassistant.components.telegram_bot.Bot", BotMock),
patch("homeassistant.components.telegram_bot.bot.Bot", BotMock),
patch.object(BotMock, "get_chat", return_value=test_chat),
patch.object(BotMock, "get_me", return_value=test_user),
patch.object(BotMock, "bot", test_user),
patch.object(BotMock, "send_message", return_value=message),
@ -225,6 +242,54 @@ def update_callback_query():
}
@pytest.fixture
def mock_broadcast_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
unique_id="mock api key",
domain=DOMAIN,
data={
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_API_KEY: "mock api key",
},
options={ATTR_PARSER: PARSER_MD},
subentries_data=[
ConfigSubentryData(
unique_id="1234567890",
data={CONF_CHAT_ID: 1234567890},
subentry_id="mock_id",
subentry_type=CONF_ALLOWED_CHAT_IDS,
title="mock chat",
)
],
)
@pytest.fixture
def mock_webhooks_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
unique_id="mock api key",
domain=DOMAIN,
data={
CONF_PLATFORM: PLATFORM_WEBHOOKS,
CONF_API_KEY: "mock api key",
CONF_URL: "https://test",
CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22",
},
options={ATTR_PARSER: PARSER_MD},
subentries_data=[
ConfigSubentryData(
unique_id="1234567890",
data={CONF_CHAT_ID: 1234567890},
subentry_id="mock_id",
subentry_type=CONF_ALLOWED_CHAT_IDS,
title="mock chat",
)
],
)
@pytest.fixture
async def webhook_platform(
hass: HomeAssistant,
@ -249,11 +314,23 @@ async def polling_platform(
hass: HomeAssistant, config_polling: dict[str, Any], mock_external_calls: None
) -> None:
"""Fixture for setting up the polling platform using appropriate config and mocks."""
await async_setup_component(
hass,
DOMAIN,
config_polling,
)
# Fire this event to start polling
hass.bus.fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.polling.ApplicationBuilder"
) as application_builder_class:
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
application.initialize = AsyncMock()
application.updater.start_polling = AsyncMock()
application.start = AsyncMock()
application.updater.stop = AsyncMock()
application.stop = AsyncMock()
application.shutdown = AsyncMock()
await async_setup_component(
hass,
DOMAIN,
config_polling,
)
await hass.async_block_till_done()

View File

@ -4,7 +4,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
async def test_setup(hass: HomeAssistant) -> None:
async def test_setup(hass: HomeAssistant, mock_external_calls: None) -> None:
"""Test setting up Telegram broadcast."""
assert await async_setup_component(
hass,

View File

@ -0,0 +1,559 @@
"""Config flow tests for the Telegram Bot integration."""
from unittest.mock import patch
from telegram import ChatFullInfo, User
from telegram.constants import AccentColor
from telegram.error import BadRequest, InvalidToken, NetworkError
from homeassistant.components.telegram_bot.const import (
ATTR_PARSER,
BOT_NAME,
CONF_ALLOWED_CHAT_IDS,
CONF_BOT_COUNT,
CONF_CHAT_ID,
CONF_PROXY_URL,
CONF_TRUSTED_NETWORKS,
DOMAIN,
ERROR_FIELD,
ERROR_MESSAGE,
ISSUE_DEPRECATED_YAML,
ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR,
PARSER_HTML,
PARSER_MD,
PLATFORM_BROADCAST,
PLATFORM_WEBHOOKS,
SUBENTRY_TYPE_ALLOWED_CHAT_IDS,
)
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER, ConfigSubentry
from homeassistant.const import CONF_API_KEY, CONF_PLATFORM, CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.issue_registry import IssueRegistry
from tests.common import MockConfigEntry
async def test_options_flow(
hass: HomeAssistant, mock_webhooks_config_entry: MockConfigEntry
) -> None:
"""Test options flow."""
mock_webhooks_config_entry.add_to_hass(hass)
# test: no input
result = await hass.config_entries.options.async_init(
mock_webhooks_config_entry.entry_id
)
await hass.async_block_till_done()
assert result["step_id"] == "init"
assert result["type"] == FlowResultType.FORM
# test: valid input
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{
ATTR_PARSER: PARSER_HTML,
},
)
await hass.async_block_till_done()
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"][ATTR_PARSER] == PARSER_HTML
async def test_reconfigure_flow_broadcast(
hass: HomeAssistant,
mock_webhooks_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test reconfigure flow for broadcast bot."""
mock_webhooks_config_entry.add_to_hass(hass)
result = await mock_webhooks_config_entry.start_reconfigure_flow(hass)
assert result["step_id"] == "reconfigure"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
# test: invalid proxy url
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
) as mock_bot:
mock_bot.side_effect = NetworkError("mock invalid proxy")
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_PROXY_URL: "invalid",
},
)
await hass.async_block_till_done()
assert result["step_id"] == "reconfigure"
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "invalid_proxy_url"
# test: valid
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_PROXY_URL: "https://test",
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
assert mock_webhooks_config_entry.data[CONF_PLATFORM] == PLATFORM_BROADCAST
async def test_reconfigure_flow_webhooks(
hass: HomeAssistant,
mock_webhooks_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test reconfigure flow for webhook."""
mock_webhooks_config_entry.add_to_hass(hass)
result = await mock_webhooks_config_entry.start_reconfigure_flow(hass)
assert result["step_id"] == "reconfigure"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PLATFORM: PLATFORM_WEBHOOKS,
CONF_PROXY_URL: "https://test",
},
)
await hass.async_block_till_done()
assert result["step_id"] == "webhooks"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
# test: invalid url
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_URL: "http://test",
CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22",
},
)
assert result["step_id"] == "webhooks"
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "invalid_url"
# test: HA external url not configured
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_TRUSTED_NETWORKS: "149.154.160.0/20,91.108.4.0/22"},
)
assert result["step_id"] == "webhooks"
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "no_url_available"
# test: invalid trusted networks
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_URL: "https://reconfigure",
CONF_TRUSTED_NETWORKS: "invalid trusted networks",
},
)
assert result["step_id"] == "webhooks"
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "invalid_trusted_networks"
# test: valid input
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_URL: "https://reconfigure",
CONF_TRUSTED_NETWORKS: "149.154.160.0/20",
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
assert mock_webhooks_config_entry.data[CONF_URL] == "https://reconfigure"
assert mock_webhooks_config_entry.data[CONF_TRUSTED_NETWORKS] == [
"149.154.160.0/20"
]
async def test_create_entry(
hass: HomeAssistant,
) -> None:
"""Test user flow."""
# test: no input
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["step_id"] == "user"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
# test: invalid proxy url
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
) as mock_bot:
mock_bot.side_effect = NetworkError("mock invalid proxy")
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PLATFORM: PLATFORM_WEBHOOKS,
CONF_API_KEY: "mock api key",
CONF_PROXY_URL: "invalid",
},
)
await hass.async_block_till_done()
assert result["step_id"] == "user"
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "invalid_proxy_url"
# test: valid input, to continue with webhooks step
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_PLATFORM: PLATFORM_WEBHOOKS,
CONF_API_KEY: "mock api key",
CONF_PROXY_URL: "https://proxy",
},
)
await hass.async_block_till_done()
assert result["step_id"] == "webhooks"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
# test: valid input for webhooks
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_URL: "https://test",
CONF_TRUSTED_NETWORKS: "149.154.160.0/20",
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Testbot"
assert result["data"][CONF_PLATFORM] == PLATFORM_WEBHOOKS
assert result["data"][CONF_API_KEY] == "mock api key"
assert result["data"][CONF_PROXY_URL] == "https://proxy"
assert result["data"][CONF_URL] == "https://test"
assert result["data"][CONF_TRUSTED_NETWORKS] == ["149.154.160.0/20"]
async def test_reauth_flow(
hass: HomeAssistant, mock_webhooks_config_entry: MockConfigEntry
) -> None:
"""Test a reauthentication flow."""
mock_webhooks_config_entry.add_to_hass(hass)
result = await mock_webhooks_config_entry.start_reauth_flow(
hass, data={CONF_API_KEY: "dummy"}
)
assert result["step_id"] == "reauth_confirm"
assert result["type"] is FlowResultType.FORM
assert result["errors"] is None
# test: reauth invalid api key
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me"
) as mock_bot:
mock_bot.side_effect = InvalidToken("mock invalid token error")
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "new mock api key"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["errors"]["base"] == "invalid_api_key"
# test: valid
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "new mock api key"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
assert mock_webhooks_config_entry.data[CONF_API_KEY] == "new mock api key"
async def test_subentry_flow(
hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry
) -> None:
"""Test subentry flow."""
mock_broadcast_config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
assert await hass.config_entries.async_setup(
mock_broadcast_config_entry.entry_id
)
await hass.async_block_till_done()
result = await hass.config_entries.subentries.async_init(
(mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_chat",
return_value=ChatFullInfo(
id=987654321,
title="mock title",
first_name="mock first_name",
type="PRIVATE",
max_reaction_count=100,
accent_color_id=AccentColor.COLOR_000,
),
):
result = await hass.config_entries.subentries.async_configure(
result["flow_id"],
user_input={CONF_CHAT_ID: 987654321},
)
await hass.async_block_till_done()
subentry_id = list(mock_broadcast_config_entry.subentries)[-1]
subentry: ConfigSubentry = mock_broadcast_config_entry.subentries[subentry_id]
assert result["type"] is FlowResultType.CREATE_ENTRY
assert subentry.subentry_type == SUBENTRY_TYPE_ALLOWED_CHAT_IDS
assert subentry.title == "mock title"
assert subentry.unique_id == "987654321"
assert subentry.data == {CONF_CHAT_ID: 987654321}
async def test_subentry_flow_chat_error(
hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry
) -> None:
"""Test subentry flow."""
mock_broadcast_config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
assert await hass.config_entries.async_setup(
mock_broadcast_config_entry.entry_id
)
await hass.async_block_till_done()
result = await hass.config_entries.subentries.async_init(
(mock_broadcast_config_entry.entry_id, SUBENTRY_TYPE_ALLOWED_CHAT_IDS),
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# test: chat not found
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_chat"
) as mock_bot:
mock_bot.side_effect = BadRequest("mock chat not found")
result = await hass.config_entries.subentries.async_configure(
result["flow_id"],
user_input={CONF_CHAT_ID: 1234567890},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"]["base"] == "chat_not_found"
# test: chat id already configured
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_chat",
return_value=ChatFullInfo(
id=1234567890,
title="mock title",
first_name="mock first_name",
type="PRIVATE",
max_reaction_count=100,
accent_color_id=AccentColor.COLOR_000,
),
):
result = await hass.config_entries.subentries.async_configure(
result["flow_id"],
user_input={CONF_CHAT_ID: 1234567890},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_import_failed(
hass: HomeAssistant, issue_registry: IssueRegistry
) -> None:
"""Test import flow failed."""
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me"
) as mock_bot:
mock_bot.side_effect = InvalidToken("mock invalid token error")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_API_KEY: "mock api key",
CONF_TRUSTED_NETWORKS: ["149.154.160.0/20"],
CONF_BOT_COUNT: 1,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "import_failed"
issue = issue_registry.async_get_issue(
domain=DOMAIN,
issue_id=ISSUE_DEPRECATED_YAML,
)
assert issue.translation_key == ISSUE_DEPRECATED_YAML_IMPORT_ISSUE_ERROR
assert (
issue.translation_placeholders[BOT_NAME] == f"{PLATFORM_BROADCAST} Telegram bot"
)
assert issue.translation_placeholders[ERROR_FIELD] == "API key"
assert issue.translation_placeholders[ERROR_MESSAGE] == "mock invalid token error"
async def test_import_multiple(
hass: HomeAssistant, issue_registry: IssueRegistry
) -> None:
"""Test import flow with multiple duplicated entries."""
data = {
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_API_KEY: "mock api key",
CONF_TRUSTED_NETWORKS: ["149.154.160.0/20"],
CONF_ALLOWED_CHAT_IDS: [3334445550],
CONF_BOT_COUNT: 2,
}
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
# test: import first entry success
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_PLATFORM] == PLATFORM_BROADCAST
assert result["data"][CONF_API_KEY] == "mock api key"
assert result["options"][ATTR_PARSER] == PARSER_MD
issue = issue_registry.async_get_issue(
domain=DOMAIN,
issue_id=ISSUE_DEPRECATED_YAML,
)
assert (
issue.translation_key == "deprecated_yaml_import_issue_has_more_platforms"
)
# test: import 2nd entry failed due to duplicate
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_duplicate_entry(hass: HomeAssistant) -> None:
"""Test user flow with duplicated entries."""
data = {
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_API_KEY: "mock api key",
}
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
# test: import first entry success
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data=data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_PLATFORM] == PLATFORM_BROADCAST
assert result["data"][CONF_API_KEY] == "mock api key"
assert result["options"][ATTR_PARSER] == PARSER_MD
# test: import 2nd entry failed due to duplicate
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data=data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"

View File

@ -6,19 +6,35 @@ from typing import Any
from unittest.mock import AsyncMock, MagicMock, mock_open, patch
import pytest
from telegram import Update
from telegram.error import NetworkError, RetryAfter, TelegramError, TimedOut
from telegram import Update, User
from telegram.error import (
InvalidToken,
NetworkError,
RetryAfter,
TelegramError,
TimedOut,
)
from homeassistant.components.telegram_bot import (
ATTR_CALLBACK_QUERY_ID,
ATTR_CHAT_ID,
ATTR_FILE,
ATTR_LATITUDE,
ATTR_LONGITUDE,
ATTR_MESSAGE,
ATTR_MESSAGE_THREAD_ID,
ATTR_MESSAGEID,
ATTR_OPTIONS,
ATTR_QUESTION,
ATTR_STICKER_ID,
ATTR_TARGET,
CONF_CONFIG_ENTRY_ID,
CONF_PLATFORM,
DOMAIN,
PLATFORM_BROADCAST,
SERVICE_ANSWER_CALLBACK_QUERY,
SERVICE_DELETE_MESSAGE,
SERVICE_EDIT_MESSAGE,
SERVICE_SEND_ANIMATION,
SERVICE_SEND_DOCUMENT,
SERVICE_SEND_LOCATION,
@ -28,13 +44,17 @@ from homeassistant.components.telegram_bot import (
SERVICE_SEND_STICKER,
SERVICE_SEND_VIDEO,
SERVICE_SEND_VOICE,
async_setup_entry,
)
from homeassistant.components.telegram_bot.webhooks import TELEGRAM_WEBHOOK_URL
from homeassistant.const import EVENT_HOMEASSISTANT_START
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_API_KEY
from homeassistant.core import Context, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.exceptions import ConfigEntryAuthFailed, ServiceValidationError
from homeassistant.setup import async_setup_component
from tests.common import async_capture_events
from tests.common import MockConfigEntry, async_capture_events
from tests.typing import ClientSessionGenerator
@ -145,7 +165,7 @@ async def test_send_file(hass: HomeAssistant, webhook_platform, service: str) ->
# Mock the file handler read with our base64 encoded dummy file
with patch(
"homeassistant.components.telegram_bot._read_file_as_bytesio",
"homeassistant.components.telegram_bot.bot._read_file_as_bytesio",
_read_file_as_bytesio_mock,
):
response = await hass.services.async_call(
@ -269,24 +289,35 @@ async def test_webhook_endpoint_generates_telegram_callback_event(
async def test_polling_platform_message_text_update(
hass: HomeAssistant, config_polling, update_message_text
hass: HomeAssistant,
config_polling,
update_message_text,
mock_external_calls: None,
) -> None:
"""Provide the `BaseTelegramBotEntity.update_handler` with an `Update` and assert fired `telegram_text` event."""
"""Provide the `BaseTelegramBot.update_handler` with an `Update` and assert fired `telegram_text` event."""
events = async_capture_events(hass, "telegram_text")
with patch(
"homeassistant.components.telegram_bot.polling.ApplicationBuilder"
) as application_builder_class:
# Set up the integration with the polling platform inside the patch context manager.
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
application.updater.start_polling = AsyncMock()
application.updater.stop = AsyncMock()
application.initialize = AsyncMock()
application.start = AsyncMock()
application.stop = AsyncMock()
application.shutdown = AsyncMock()
await async_setup_component(
hass,
DOMAIN,
config_polling,
)
await hass.async_block_till_done()
# Set up the integration with the polling platform inside the patch context manager.
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
# Then call the callback and assert events fired.
handler = application.add_handler.call_args[0][0]
handle_update_callback = handler.callback
@ -295,13 +326,9 @@ async def test_polling_platform_message_text_update(
application.bot.defaults.tzinfo = None
update = Update.de_json(update_message_text, application.bot)
# handle_update_callback == BaseTelegramBotEntity.update_handler
# handle_update_callback == BaseTelegramBot.update_handler
await handle_update_callback(update, None)
application.updater.stop = AsyncMock()
application.stop = AsyncMock()
application.shutdown = AsyncMock()
# Make sure event has fired
await hass.async_block_till_done()
@ -326,6 +353,7 @@ async def test_polling_platform_add_error_handler(
hass: HomeAssistant,
config_polling: dict[str, Any],
update_message_text: dict[str, Any],
mock_external_calls: None,
caplog: pytest.LogCaptureFixture,
error: Exception,
log_message: str,
@ -334,6 +362,17 @@ async def test_polling_platform_add_error_handler(
with patch(
"homeassistant.components.telegram_bot.polling.ApplicationBuilder"
) as application_builder_class:
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
application.updater.stop = AsyncMock()
application.initialize = AsyncMock()
application.updater.start_polling = AsyncMock()
application.start = AsyncMock()
application.stop = AsyncMock()
application.shutdown = AsyncMock()
application.bot.defaults.tzinfo = None
await async_setup_component(
hass,
DOMAIN,
@ -341,16 +380,8 @@ async def test_polling_platform_add_error_handler(
)
await hass.async_block_till_done()
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
application.updater.stop = AsyncMock()
application.stop = AsyncMock()
application.shutdown = AsyncMock()
process_error = application.add_error_handler.call_args[0][0]
application.bot.defaults.tzinfo = None
update = Update.de_json(update_message_text, application.bot)
process_error = application.add_error_handler.call_args[0][0]
await process_error(update, MagicMock(error=error))
assert log_message in caplog.text
@ -372,6 +403,7 @@ async def test_polling_platform_start_polling_error_callback(
hass: HomeAssistant,
config_polling: dict[str, Any],
caplog: pytest.LogCaptureFixture,
mock_external_calls: None,
error: Exception,
log_message: str,
) -> None:
@ -379,13 +411,6 @@ async def test_polling_platform_start_polling_error_callback(
with patch(
"homeassistant.components.telegram_bot.polling.ApplicationBuilder"
) as application_builder_class:
await async_setup_component(
hass,
DOMAIN,
config_polling,
)
await hass.async_block_till_done()
application = (
application_builder_class.return_value.bot.return_value.build.return_value
)
@ -396,7 +421,12 @@ async def test_polling_platform_start_polling_error_callback(
application.stop = AsyncMock()
application.shutdown = AsyncMock()
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await async_setup_component(
hass,
DOMAIN,
config_polling,
)
await hass.async_block_till_done()
error_callback = application.updater.start_polling.call_args.kwargs[
"error_callback"
@ -466,3 +496,220 @@ async def test_webhook_endpoint_invalid_secret_token_is_denied(
headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token},
)
assert response.status == 401
async def test_multiple_config_entries_error(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
polling_platform,
mock_external_calls: None,
) -> None:
"""Test multiple config entries error."""
# setup the second entry (polling_platform is first entry)
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with pytest.raises(ServiceValidationError) as err:
await hass.services.async_call(
DOMAIN,
SERVICE_SEND_MESSAGE,
{
ATTR_MESSAGE: "mock message",
},
blocking=True,
return_response=True,
)
await hass.async_block_till_done()
assert err.value.translation_key == "multiple_config_entry"
async def test_send_message_with_config_entry(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test send message using config entry."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
response = await hass.services.async_call(
DOMAIN,
SERVICE_SEND_MESSAGE,
{
CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id,
ATTR_MESSAGE: "mock message",
ATTR_TARGET: 1,
},
blocking=True,
return_response=True,
)
assert response["chats"][0]["message_id"] == 12345
async def test_send_message_no_chat_id_error(
hass: HomeAssistant,
mock_external_calls: None,
) -> None:
"""Test send message using config entry with no whitelisted chat id."""
data = {
CONF_PLATFORM: PLATFORM_BROADCAST,
CONF_API_KEY: "mock api key",
}
with patch(
"homeassistant.components.telegram_bot.config_flow.Bot.get_me",
return_value=User(123456, "Testbot", True),
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data=data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
with pytest.raises(ServiceValidationError) as err:
await hass.services.async_call(
DOMAIN,
SERVICE_SEND_MESSAGE,
{
CONF_CONFIG_ENTRY_ID: result["result"].entry_id,
ATTR_MESSAGE: "mock message",
},
blocking=True,
return_response=True,
)
assert err.value.translation_key == "missing_allowed_chat_ids"
assert err.value.translation_placeholders["bot_name"] == "Testbot"
async def test_send_message_config_entry_error(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test send message config entry error."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
await hass.config_entries.async_unload(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with pytest.raises(ServiceValidationError) as err:
await hass.services.async_call(
DOMAIN,
SERVICE_SEND_MESSAGE,
{
CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id,
ATTR_MESSAGE: "mock message",
},
blocking=True,
return_response=True,
)
await hass.async_block_till_done()
assert err.value.translation_key == "missing_config_entry"
async def test_delete_message(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test delete message."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.bot.TelegramNotificationService.delete_message",
AsyncMock(return_value=True),
) as mock:
await hass.services.async_call(
DOMAIN,
SERVICE_DELETE_MESSAGE,
{ATTR_CHAT_ID: 12345, ATTR_MESSAGEID: 12345},
blocking=True,
)
await hass.async_block_till_done()
mock.assert_called_once()
async def test_edit_message(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test edit message."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.bot.TelegramNotificationService.edit_message",
AsyncMock(return_value=True),
) as mock:
await hass.services.async_call(
DOMAIN,
SERVICE_EDIT_MESSAGE,
{ATTR_MESSAGE: "mock message", ATTR_CHAT_ID: 12345, ATTR_MESSAGEID: 12345},
blocking=True,
)
await hass.async_block_till_done()
mock.assert_called_once()
async def test_async_setup_entry_failed(
hass: HomeAssistant, mock_broadcast_config_entry: MockConfigEntry
) -> None:
"""Test setup entry failed."""
mock_broadcast_config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.telegram_bot.Bot.get_me",
) as mock_bot:
mock_bot.side_effect = InvalidToken("mock invalid token error")
with pytest.raises(ConfigEntryAuthFailed) as err:
await async_setup_entry(hass, mock_broadcast_config_entry)
await hass.async_block_till_done()
assert err.value.args[0] == "Invalid API token for Telegram Bot."
async def test_answer_callback_query(
hass: HomeAssistant,
mock_broadcast_config_entry: MockConfigEntry,
mock_external_calls: None,
) -> None:
"""Test answer callback query."""
mock_broadcast_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_broadcast_config_entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.telegram_bot.bot.TelegramNotificationService.answer_callback_query",
AsyncMock(),
) as mock:
await hass.services.async_call(
DOMAIN,
SERVICE_ANSWER_CALLBACK_QUERY,
{
ATTR_MESSAGE: "mock message",
ATTR_CALLBACK_QUERY_ID: 12345,
},
blocking=True,
)
await hass.async_block_till_done()
mock.assert_called_once()