mirror of
https://github.com/home-assistant/core.git
synced 2025-07-24 21:57:51 +00:00
Add secret_token support to telegram_bot component (#100869)
* Support secret_token for setWebHook api * Revert configuration YAML changes; generate and store secret token instead * Reformat codes * Revert storage of secret token; use ephemeral secret token instead * Reformat * Update homeassistant/components/telegram_bot/webhooks.py * Fix when header is not present * Check for non-empty token * Fix tests to support secret token * Add tests for invalid secret token * Minor: remove comment * Revert back to 401 * ... and for tests * Change patching method for the generation of secret tokens --------- Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
parent
1b43d79717
commit
41cb8526d1
@ -3,6 +3,8 @@ import datetime as dt
|
|||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
from ipaddress import ip_address
|
from ipaddress import ip_address
|
||||||
import logging
|
import logging
|
||||||
|
import secrets
|
||||||
|
import string
|
||||||
|
|
||||||
from telegram import Update
|
from telegram import Update
|
||||||
from telegram.error import TimedOut
|
from telegram.error import TimedOut
|
||||||
@ -18,11 +20,17 @@ _LOGGER = logging.getLogger(__name__)
|
|||||||
|
|
||||||
TELEGRAM_WEBHOOK_URL = "/api/telegram_webhooks"
|
TELEGRAM_WEBHOOK_URL = "/api/telegram_webhooks"
|
||||||
REMOVE_WEBHOOK_URL = ""
|
REMOVE_WEBHOOK_URL = ""
|
||||||
|
SECRET_TOKEN_LENGTH = 32
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_platform(hass, bot, config):
|
async def async_setup_platform(hass, bot, config):
|
||||||
"""Set up the Telegram webhooks platform."""
|
"""Set up the Telegram webhooks platform."""
|
||||||
pushbot = PushBot(hass, bot, config)
|
|
||||||
|
# Generate an ephemeral secret token
|
||||||
|
alphabet = string.ascii_letters + string.digits + "-_"
|
||||||
|
secret_token = "".join(secrets.choice(alphabet) for _ in range(SECRET_TOKEN_LENGTH))
|
||||||
|
|
||||||
|
pushbot = PushBot(hass, bot, config, secret_token)
|
||||||
|
|
||||||
if not pushbot.webhook_url.startswith("https"):
|
if not pushbot.webhook_url.startswith("https"):
|
||||||
_LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url)
|
_LOGGER.error("Invalid telegram webhook %s must be https", pushbot.webhook_url)
|
||||||
@ -34,7 +42,13 @@ async def async_setup_platform(hass, bot, config):
|
|||||||
|
|
||||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook)
|
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, pushbot.deregister_webhook)
|
||||||
hass.http.register_view(
|
hass.http.register_view(
|
||||||
PushBotView(hass, bot, pushbot.dispatcher, config[CONF_TRUSTED_NETWORKS])
|
PushBotView(
|
||||||
|
hass,
|
||||||
|
bot,
|
||||||
|
pushbot.dispatcher,
|
||||||
|
config[CONF_TRUSTED_NETWORKS],
|
||||||
|
secret_token,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -42,10 +56,11 @@ async def async_setup_platform(hass, bot, config):
|
|||||||
class PushBot(BaseTelegramBotEntity):
|
class PushBot(BaseTelegramBotEntity):
|
||||||
"""Handles all the push/webhook logic and passes telegram updates to `self.handle_update`."""
|
"""Handles all the push/webhook logic and passes telegram updates to `self.handle_update`."""
|
||||||
|
|
||||||
def __init__(self, hass, bot, config):
|
def __init__(self, hass, bot, config, secret_token):
|
||||||
"""Create Dispatcher before calling super()."""
|
"""Create Dispatcher before calling super()."""
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.trusted_networks = config[CONF_TRUSTED_NETWORKS]
|
self.trusted_networks = config[CONF_TRUSTED_NETWORKS]
|
||||||
|
self.secret_token = secret_token
|
||||||
# Dumb dispatcher that just gets our updates to our handler callback (self.handle_update)
|
# Dumb dispatcher that just gets our updates to our handler callback (self.handle_update)
|
||||||
self.dispatcher = Dispatcher(bot, None)
|
self.dispatcher = Dispatcher(bot, None)
|
||||||
self.dispatcher.add_handler(TypeHandler(Update, self.handle_update))
|
self.dispatcher.add_handler(TypeHandler(Update, self.handle_update))
|
||||||
@ -61,7 +76,11 @@ class PushBot(BaseTelegramBotEntity):
|
|||||||
retry_num = 0
|
retry_num = 0
|
||||||
while retry_num < 3:
|
while retry_num < 3:
|
||||||
try:
|
try:
|
||||||
return self.bot.set_webhook(self.webhook_url, timeout=5)
|
return self.bot.set_webhook(
|
||||||
|
self.webhook_url,
|
||||||
|
api_kwargs={"secret_token": self.secret_token},
|
||||||
|
timeout=5,
|
||||||
|
)
|
||||||
except TimedOut:
|
except TimedOut:
|
||||||
retry_num += 1
|
retry_num += 1
|
||||||
_LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num)
|
_LOGGER.warning("Timeout trying to set webhook (retry #%d)", retry_num)
|
||||||
@ -108,12 +127,13 @@ class PushBotView(HomeAssistantView):
|
|||||||
url = TELEGRAM_WEBHOOK_URL
|
url = TELEGRAM_WEBHOOK_URL
|
||||||
name = "telegram_webhooks"
|
name = "telegram_webhooks"
|
||||||
|
|
||||||
def __init__(self, hass, bot, dispatcher, trusted_networks):
|
def __init__(self, hass, bot, dispatcher, trusted_networks, secret_token):
|
||||||
"""Initialize by storing stuff needed for setting up our webhook endpoint."""
|
"""Initialize by storing stuff needed for setting up our webhook endpoint."""
|
||||||
self.hass = hass
|
self.hass = hass
|
||||||
self.bot = bot
|
self.bot = bot
|
||||||
self.dispatcher = dispatcher
|
self.dispatcher = dispatcher
|
||||||
self.trusted_networks = trusted_networks
|
self.trusted_networks = trusted_networks
|
||||||
|
self.secret_token = secret_token
|
||||||
|
|
||||||
async def post(self, request):
|
async def post(self, request):
|
||||||
"""Accept the POST from telegram."""
|
"""Accept the POST from telegram."""
|
||||||
@ -121,6 +141,10 @@ class PushBotView(HomeAssistantView):
|
|||||||
if not any(real_ip in net for net in self.trusted_networks):
|
if not any(real_ip in net for net in self.trusted_networks):
|
||||||
_LOGGER.warning("Access denied from %s", real_ip)
|
_LOGGER.warning("Access denied from %s", real_ip)
|
||||||
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)
|
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)
|
||||||
|
secret_token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
|
||||||
|
if secret_token_header is None or self.secret_token != secret_token_header:
|
||||||
|
_LOGGER.warning("Invalid secret token from %s", real_ip)
|
||||||
|
return self.json_message("Access denied", HTTPStatus.UNAUTHORIZED)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
update_data = await request.json()
|
update_data = await request.json()
|
||||||
|
@ -65,6 +65,23 @@ def mock_register_webhook():
|
|||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_generate_secret_token():
|
||||||
|
"""Mock secret token generated for webhook."""
|
||||||
|
mock_secret_token = "DEADBEEF12345678DEADBEEF87654321"
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.telegram_bot.webhooks.secrets.choice",
|
||||||
|
side_effect=mock_secret_token,
|
||||||
|
):
|
||||||
|
yield mock_secret_token
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def incorrect_secret_token():
|
||||||
|
"""Mock incorrect secret token."""
|
||||||
|
return "AAAABBBBCCCCDDDDEEEEFFFF00009999"
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def update_message_command():
|
def update_message_command():
|
||||||
"""Fixture for mocking an incoming update of type message/command."""
|
"""Fixture for mocking an incoming update of type message/command."""
|
||||||
@ -156,7 +173,9 @@ def update_callback_query():
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def webhook_platform(hass, config_webhooks, mock_register_webhook):
|
async def webhook_platform(
|
||||||
|
hass, config_webhooks, mock_register_webhook, mock_generate_secret_token
|
||||||
|
):
|
||||||
"""Fixture for setting up the webhooks platform using appropriate config and mocks."""
|
"""Fixture for setting up the webhooks platform using appropriate config and mocks."""
|
||||||
await async_setup_component(
|
await async_setup_component(
|
||||||
hass,
|
hass,
|
||||||
|
@ -35,12 +35,17 @@ async def test_webhook_endpoint_generates_telegram_text_event(
|
|||||||
webhook_platform,
|
webhook_platform,
|
||||||
hass_client: ClientSessionGenerator,
|
hass_client: ClientSessionGenerator,
|
||||||
update_message_text,
|
update_message_text,
|
||||||
|
mock_generate_secret_token,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""POST to the configured webhook endpoint and assert fired `telegram_text` event."""
|
"""POST to the configured webhook endpoint and assert fired `telegram_text` event."""
|
||||||
client = await hass_client()
|
client = await hass_client()
|
||||||
events = async_capture_events(hass, "telegram_text")
|
events = async_capture_events(hass, "telegram_text")
|
||||||
|
|
||||||
response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_text)
|
response = await client.post(
|
||||||
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=update_message_text,
|
||||||
|
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
|
||||||
|
)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert (await response.read()).decode("utf-8") == ""
|
assert (await response.read()).decode("utf-8") == ""
|
||||||
|
|
||||||
@ -56,12 +61,17 @@ async def test_webhook_endpoint_generates_telegram_command_event(
|
|||||||
webhook_platform,
|
webhook_platform,
|
||||||
hass_client: ClientSessionGenerator,
|
hass_client: ClientSessionGenerator,
|
||||||
update_message_command,
|
update_message_command,
|
||||||
|
mock_generate_secret_token,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""POST to the configured webhook endpoint and assert fired `telegram_command` event."""
|
"""POST to the configured webhook endpoint and assert fired `telegram_command` event."""
|
||||||
client = await hass_client()
|
client = await hass_client()
|
||||||
events = async_capture_events(hass, "telegram_command")
|
events = async_capture_events(hass, "telegram_command")
|
||||||
|
|
||||||
response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_message_command)
|
response = await client.post(
|
||||||
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=update_message_command,
|
||||||
|
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
|
||||||
|
)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert (await response.read()).decode("utf-8") == ""
|
assert (await response.read()).decode("utf-8") == ""
|
||||||
|
|
||||||
@ -77,12 +87,17 @@ async def test_webhook_endpoint_generates_telegram_callback_event(
|
|||||||
webhook_platform,
|
webhook_platform,
|
||||||
hass_client: ClientSessionGenerator,
|
hass_client: ClientSessionGenerator,
|
||||||
update_callback_query,
|
update_callback_query,
|
||||||
|
mock_generate_secret_token,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""POST to the configured webhook endpoint and assert fired `telegram_callback` event."""
|
"""POST to the configured webhook endpoint and assert fired `telegram_callback` event."""
|
||||||
client = await hass_client()
|
client = await hass_client()
|
||||||
events = async_capture_events(hass, "telegram_callback")
|
events = async_capture_events(hass, "telegram_callback")
|
||||||
|
|
||||||
response = await client.post(TELEGRAM_WEBHOOK_URL, json=update_callback_query)
|
response = await client.post(
|
||||||
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=update_callback_query,
|
||||||
|
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
|
||||||
|
)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert (await response.read()).decode("utf-8") == ""
|
assert (await response.read()).decode("utf-8") == ""
|
||||||
|
|
||||||
@ -119,13 +134,16 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex
|
|||||||
webhook_platform,
|
webhook_platform,
|
||||||
hass_client: ClientSessionGenerator,
|
hass_client: ClientSessionGenerator,
|
||||||
unauthorized_update_message_text,
|
unauthorized_update_message_text,
|
||||||
|
mock_generate_secret_token,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Update with unauthorized user/chat should not trigger event."""
|
"""Update with unauthorized user/chat should not trigger event."""
|
||||||
client = await hass_client()
|
client = await hass_client()
|
||||||
events = async_capture_events(hass, "telegram_text")
|
events = async_capture_events(hass, "telegram_text")
|
||||||
|
|
||||||
response = await client.post(
|
response = await client.post(
|
||||||
TELEGRAM_WEBHOOK_URL, json=unauthorized_update_message_text
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=unauthorized_update_message_text,
|
||||||
|
headers={"X-Telegram-Bot-Api-Secret-Token": mock_generate_secret_token},
|
||||||
)
|
)
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
assert (await response.read()).decode("utf-8") == ""
|
assert (await response.read()).decode("utf-8") == ""
|
||||||
@ -134,3 +152,39 @@ async def test_webhook_endpoint_unauthorized_update_doesnt_generate_telegram_tex
|
|||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
assert len(events) == 0
|
assert len(events) == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_endpoint_without_secret_token_is_denied(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
webhook_platform,
|
||||||
|
hass_client: ClientSessionGenerator,
|
||||||
|
update_message_text,
|
||||||
|
) -> None:
|
||||||
|
"""Request without a secret token header should be denied."""
|
||||||
|
client = await hass_client()
|
||||||
|
async_capture_events(hass, "telegram_text")
|
||||||
|
|
||||||
|
response = await client.post(
|
||||||
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=update_message_text,
|
||||||
|
)
|
||||||
|
assert response.status == 401
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_endpoint_invalid_secret_token_is_denied(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
webhook_platform,
|
||||||
|
hass_client: ClientSessionGenerator,
|
||||||
|
update_message_text,
|
||||||
|
incorrect_secret_token,
|
||||||
|
) -> None:
|
||||||
|
"""Request with an invalid secret token header should be denied."""
|
||||||
|
client = await hass_client()
|
||||||
|
async_capture_events(hass, "telegram_text")
|
||||||
|
|
||||||
|
response = await client.post(
|
||||||
|
TELEGRAM_WEBHOOK_URL,
|
||||||
|
json=update_message_text,
|
||||||
|
headers={"X-Telegram-Bot-Api-Secret-Token": incorrect_secret_token},
|
||||||
|
)
|
||||||
|
assert response.status == 401
|
||||||
|
Loading…
x
Reference in New Issue
Block a user