mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 05:07:41 +00:00
Improve mobile_app key handling (#67429)
This commit is contained in:
parent
0974abf9e2
commit
5b8cf379a3
@ -28,6 +28,7 @@ ATTR_CONFIG_ENTRY_ID = "entry_id"
|
|||||||
ATTR_DEVICE_NAME = "device_name"
|
ATTR_DEVICE_NAME = "device_name"
|
||||||
ATTR_MANUFACTURER = "manufacturer"
|
ATTR_MANUFACTURER = "manufacturer"
|
||||||
ATTR_MODEL = "model"
|
ATTR_MODEL = "model"
|
||||||
|
ATTR_NO_LEGACY_ENCRYPTION = "no_legacy_encryption"
|
||||||
ATTR_OS_NAME = "os_name"
|
ATTR_OS_NAME = "os_name"
|
||||||
ATTR_OS_VERSION = "os_version"
|
ATTR_OS_VERSION = "os_version"
|
||||||
ATTR_PUSH_WEBSOCKET_CHANNEL = "push_websocket_channel"
|
ATTR_PUSH_WEBSOCKET_CHANNEL = "push_websocket_channel"
|
||||||
|
@ -7,7 +7,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from aiohttp.web import Response, json_response
|
from aiohttp.web import Response, json_response
|
||||||
from nacl.encoding import Base64Encoder
|
from nacl.encoding import Base64Encoder, HexEncoder, RawEncoder
|
||||||
from nacl.secret import SecretBox
|
from nacl.secret import SecretBox
|
||||||
|
|
||||||
from homeassistant.const import ATTR_DEVICE_ID, CONTENT_TYPE_JSON
|
from homeassistant.const import ATTR_DEVICE_ID, CONTENT_TYPE_JSON
|
||||||
@ -23,6 +23,7 @@ from .const import (
|
|||||||
ATTR_DEVICE_NAME,
|
ATTR_DEVICE_NAME,
|
||||||
ATTR_MANUFACTURER,
|
ATTR_MANUFACTURER,
|
||||||
ATTR_MODEL,
|
ATTR_MODEL,
|
||||||
|
ATTR_NO_LEGACY_ENCRYPTION,
|
||||||
ATTR_OS_VERSION,
|
ATTR_OS_VERSION,
|
||||||
ATTR_SUPPORTS_ENCRYPTION,
|
ATTR_SUPPORTS_ENCRYPTION,
|
||||||
CONF_SECRET,
|
CONF_SECRET,
|
||||||
@ -34,7 +35,7 @@ from .const import (
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def setup_decrypt() -> tuple[int, Callable]:
|
def setup_decrypt(key_encoder) -> tuple[int, Callable]:
|
||||||
"""Return decryption function and length of key.
|
"""Return decryption function and length of key.
|
||||||
|
|
||||||
Async friendly.
|
Async friendly.
|
||||||
@ -42,12 +43,14 @@ def setup_decrypt() -> tuple[int, Callable]:
|
|||||||
|
|
||||||
def decrypt(ciphertext, key):
|
def decrypt(ciphertext, key):
|
||||||
"""Decrypt ciphertext using key."""
|
"""Decrypt ciphertext using key."""
|
||||||
return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder)
|
return SecretBox(key, encoder=key_encoder).decrypt(
|
||||||
|
ciphertext, encoder=Base64Encoder
|
||||||
|
)
|
||||||
|
|
||||||
return (SecretBox.KEY_SIZE, decrypt)
|
return (SecretBox.KEY_SIZE, decrypt)
|
||||||
|
|
||||||
|
|
||||||
def setup_encrypt() -> tuple[int, Callable]:
|
def setup_encrypt(key_encoder) -> tuple[int, Callable]:
|
||||||
"""Return encryption function and length of key.
|
"""Return encryption function and length of key.
|
||||||
|
|
||||||
Async friendly.
|
Async friendly.
|
||||||
@ -55,15 +58,22 @@ def setup_encrypt() -> tuple[int, Callable]:
|
|||||||
|
|
||||||
def encrypt(ciphertext, key):
|
def encrypt(ciphertext, key):
|
||||||
"""Encrypt ciphertext using key."""
|
"""Encrypt ciphertext using key."""
|
||||||
return SecretBox(key).encrypt(ciphertext, encoder=Base64Encoder)
|
return SecretBox(key, encoder=key_encoder).encrypt(
|
||||||
|
ciphertext, encoder=Base64Encoder
|
||||||
|
)
|
||||||
|
|
||||||
return (SecretBox.KEY_SIZE, encrypt)
|
return (SecretBox.KEY_SIZE, encrypt)
|
||||||
|
|
||||||
|
|
||||||
def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None:
|
def _decrypt_payload_helper(
|
||||||
|
key: str | None,
|
||||||
|
ciphertext: str,
|
||||||
|
get_key_bytes: Callable[[str, int], str | bytes],
|
||||||
|
key_encoder,
|
||||||
|
) -> dict[str, str] | None:
|
||||||
"""Decrypt encrypted payload."""
|
"""Decrypt encrypted payload."""
|
||||||
try:
|
try:
|
||||||
keylen, decrypt = setup_decrypt()
|
keylen, decrypt = setup_decrypt(key_encoder)
|
||||||
except OSError:
|
except OSError:
|
||||||
_LOGGER.warning("Ignoring encrypted payload because libsodium not installed")
|
_LOGGER.warning("Ignoring encrypted payload because libsodium not installed")
|
||||||
return None
|
return None
|
||||||
@ -72,18 +82,33 @@ def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None:
|
|||||||
_LOGGER.warning("Ignoring encrypted payload because no decryption key known")
|
_LOGGER.warning("Ignoring encrypted payload because no decryption key known")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
key_bytes = key.encode("utf-8")
|
key_bytes = get_key_bytes(key, keylen)
|
||||||
key_bytes = key_bytes[:keylen]
|
|
||||||
key_bytes = key_bytes.ljust(keylen, b"\0")
|
|
||||||
|
|
||||||
try:
|
msg_bytes = decrypt(ciphertext, key_bytes)
|
||||||
msg_bytes = decrypt(ciphertext, key_bytes)
|
message = json.loads(msg_bytes.decode("utf-8"))
|
||||||
message = json.loads(msg_bytes.decode("utf-8"))
|
_LOGGER.debug("Successfully decrypted mobile_app payload")
|
||||||
_LOGGER.debug("Successfully decrypted mobile_app payload")
|
return message
|
||||||
return message
|
|
||||||
except ValueError:
|
|
||||||
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
|
def _decrypt_payload(key: str | None, ciphertext: str) -> dict[str, str] | None:
|
||||||
return None
|
"""Decrypt encrypted payload."""
|
||||||
|
|
||||||
|
def get_key_bytes(key: str, keylen: int) -> str:
|
||||||
|
return key
|
||||||
|
|
||||||
|
return _decrypt_payload_helper(key, ciphertext, get_key_bytes, HexEncoder)
|
||||||
|
|
||||||
|
|
||||||
|
def _decrypt_payload_legacy(key: str | None, ciphertext: str) -> dict[str, str] | None:
|
||||||
|
"""Decrypt encrypted payload."""
|
||||||
|
|
||||||
|
def get_key_bytes(key: str, keylen: int) -> bytes:
|
||||||
|
key_bytes = key.encode("utf-8")
|
||||||
|
key_bytes = key_bytes[:keylen]
|
||||||
|
key_bytes = key_bytes.ljust(keylen, b"\0")
|
||||||
|
return key_bytes
|
||||||
|
|
||||||
|
return _decrypt_payload_helper(key, ciphertext, get_key_bytes, RawEncoder)
|
||||||
|
|
||||||
|
|
||||||
def registration_context(registration: dict) -> Context:
|
def registration_context(registration: dict) -> Context:
|
||||||
@ -158,11 +183,16 @@ def webhook_response(
|
|||||||
data = json.dumps(data, cls=JSONEncoder)
|
data = json.dumps(data, cls=JSONEncoder)
|
||||||
|
|
||||||
if registration[ATTR_SUPPORTS_ENCRYPTION]:
|
if registration[ATTR_SUPPORTS_ENCRYPTION]:
|
||||||
keylen, encrypt = setup_encrypt()
|
keylen, encrypt = setup_encrypt(
|
||||||
|
HexEncoder if ATTR_NO_LEGACY_ENCRYPTION in registration else RawEncoder
|
||||||
|
)
|
||||||
|
|
||||||
key = registration[CONF_SECRET].encode("utf-8")
|
if ATTR_NO_LEGACY_ENCRYPTION in registration:
|
||||||
key = key[:keylen]
|
key = registration[CONF_SECRET]
|
||||||
key = key.ljust(keylen, b"\0")
|
else:
|
||||||
|
key = registration[CONF_SECRET].encode("utf-8")
|
||||||
|
key = key[:keylen]
|
||||||
|
key = key.ljust(keylen, b"\0")
|
||||||
|
|
||||||
enc_data = encrypt(data.encode("utf-8"), key).decode("utf-8")
|
enc_data = encrypt(data.encode("utf-8"), key).decode("utf-8")
|
||||||
data = json.dumps({"encrypted": True, "encrypted_data": enc_data})
|
data = json.dumps({"encrypted": True, "encrypted_data": enc_data})
|
||||||
|
@ -7,6 +7,7 @@ import logging
|
|||||||
import secrets
|
import secrets
|
||||||
|
|
||||||
from aiohttp.web import HTTPBadRequest, Request, Response, json_response
|
from aiohttp.web import HTTPBadRequest, Request, Response, json_response
|
||||||
|
from nacl.exceptions import CryptoError
|
||||||
from nacl.secret import SecretBox
|
from nacl.secret import SecretBox
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
@ -58,6 +59,7 @@ from .const import (
|
|||||||
ATTR_EVENT_TYPE,
|
ATTR_EVENT_TYPE,
|
||||||
ATTR_MANUFACTURER,
|
ATTR_MANUFACTURER,
|
||||||
ATTR_MODEL,
|
ATTR_MODEL,
|
||||||
|
ATTR_NO_LEGACY_ENCRYPTION,
|
||||||
ATTR_OS_VERSION,
|
ATTR_OS_VERSION,
|
||||||
ATTR_SENSOR_ATTRIBUTES,
|
ATTR_SENSOR_ATTRIBUTES,
|
||||||
ATTR_SENSOR_DEVICE_CLASS,
|
ATTR_SENSOR_DEVICE_CLASS,
|
||||||
@ -97,6 +99,7 @@ from .const import (
|
|||||||
)
|
)
|
||||||
from .helpers import (
|
from .helpers import (
|
||||||
_decrypt_payload,
|
_decrypt_payload,
|
||||||
|
_decrypt_payload_legacy,
|
||||||
empty_okay_response,
|
empty_okay_response,
|
||||||
error_response,
|
error_response,
|
||||||
registration_context,
|
registration_context,
|
||||||
@ -191,7 +194,27 @@ async def handle_webhook(
|
|||||||
|
|
||||||
if req_data[ATTR_WEBHOOK_ENCRYPTED]:
|
if req_data[ATTR_WEBHOOK_ENCRYPTED]:
|
||||||
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
|
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
|
||||||
webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
|
try:
|
||||||
|
webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
|
||||||
|
if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data:
|
||||||
|
data = {**config_entry.data, ATTR_NO_LEGACY_ENCRYPTION: True}
|
||||||
|
hass.config_entries.async_update_entry(config_entry, data=data)
|
||||||
|
except CryptoError:
|
||||||
|
if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data:
|
||||||
|
try:
|
||||||
|
webhook_payload = _decrypt_payload_legacy(
|
||||||
|
config_entry.data[CONF_SECRET], enc_data
|
||||||
|
)
|
||||||
|
except CryptoError:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Ignoring encrypted payload because unable to decrypt"
|
||||||
|
)
|
||||||
|
except ValueError:
|
||||||
|
_LOGGER.warning("Ignoring invalid encrypted payload")
|
||||||
|
else:
|
||||||
|
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
|
||||||
|
except ValueError:
|
||||||
|
_LOGGER.warning("Ignoring invalid encrypted payload")
|
||||||
|
|
||||||
if webhook_type not in WEBHOOK_COMMANDS:
|
if webhook_type not in WEBHOOK_COMMANDS:
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
"""Tests for the mobile_app HTTP API."""
|
"""Tests for the mobile_app HTTP API."""
|
||||||
|
from binascii import unhexlify
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
import json
|
import json
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
@ -75,6 +76,49 @@ async def test_registration_encryption(hass, hass_client):
|
|||||||
assert resp.status == HTTPStatus.CREATED
|
assert resp.status == HTTPStatus.CREATED
|
||||||
register_json = await resp.json()
|
register_json = await resp.json()
|
||||||
|
|
||||||
|
key = unhexlify(register_json[CONF_SECRET])
|
||||||
|
|
||||||
|
payload = json.dumps(RENDER_TEMPLATE["data"]).encode("utf-8")
|
||||||
|
|
||||||
|
data = SecretBox(key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
|
||||||
|
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
|
||||||
|
resp = await api_client.post(
|
||||||
|
f"/api/webhook/{register_json[CONF_WEBHOOK_ID]}", json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert "encrypted_data" in webhook_json
|
||||||
|
|
||||||
|
decrypted_data = SecretBox(key).decrypt(
|
||||||
|
webhook_json["encrypted_data"], encoder=Base64Encoder
|
||||||
|
)
|
||||||
|
decrypted_data = decrypted_data.decode("utf-8")
|
||||||
|
|
||||||
|
assert json.loads(decrypted_data) == {"one": "Hello world"}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_registration_encryption_legacy(hass, hass_client):
|
||||||
|
"""Test that registrations happen."""
|
||||||
|
try:
|
||||||
|
from nacl.encoding import Base64Encoder
|
||||||
|
from nacl.secret import SecretBox
|
||||||
|
except (ImportError, OSError):
|
||||||
|
pytest.skip("libnacl/libsodium is not installed")
|
||||||
|
return
|
||||||
|
|
||||||
|
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||||
|
|
||||||
|
api_client = await hass_client()
|
||||||
|
|
||||||
|
resp = await api_client.post("/api/mobile_app/registrations", json=REGISTER)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.CREATED
|
||||||
|
register_json = await resp.json()
|
||||||
|
|
||||||
keylen = SecretBox.KEY_SIZE
|
keylen = SecretBox.KEY_SIZE
|
||||||
key = register_json[CONF_SECRET].encode("utf-8")
|
key = register_json[CONF_SECRET].encode("utf-8")
|
||||||
key = key[:keylen]
|
key = key[:keylen]
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
"""Webhook tests for mobile_app."""
|
"""Webhook tests for mobile_app."""
|
||||||
|
from binascii import unhexlify
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
@ -22,7 +23,29 @@ from .const import CALL_SERVICE, FIRE_EVENT, REGISTER_CLEARTEXT, RENDER_TEMPLATE
|
|||||||
from tests.common import async_mock_service
|
from tests.common import async_mock_service
|
||||||
|
|
||||||
|
|
||||||
def encrypt_payload(secret_key, payload):
|
def encrypt_payload(secret_key, payload, encode_json=True):
|
||||||
|
"""Return a encrypted payload given a key and dictionary of data."""
|
||||||
|
try:
|
||||||
|
from nacl.encoding import Base64Encoder
|
||||||
|
from nacl.secret import SecretBox
|
||||||
|
except (ImportError, OSError):
|
||||||
|
pytest.skip("libnacl/libsodium is not installed")
|
||||||
|
return
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
prepped_key = unhexlify(secret_key)
|
||||||
|
|
||||||
|
if encode_json:
|
||||||
|
payload = json.dumps(payload)
|
||||||
|
payload = payload.encode("utf-8")
|
||||||
|
|
||||||
|
return (
|
||||||
|
SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def encrypt_payload_legacy(secret_key, payload, encode_json=True):
|
||||||
"""Return a encrypted payload given a key and dictionary of data."""
|
"""Return a encrypted payload given a key and dictionary of data."""
|
||||||
try:
|
try:
|
||||||
from nacl.encoding import Base64Encoder
|
from nacl.encoding import Base64Encoder
|
||||||
@ -38,7 +61,9 @@ def encrypt_payload(secret_key, payload):
|
|||||||
prepped_key = prepped_key[:keylen]
|
prepped_key = prepped_key[:keylen]
|
||||||
prepped_key = prepped_key.ljust(keylen, b"\0")
|
prepped_key = prepped_key.ljust(keylen, b"\0")
|
||||||
|
|
||||||
payload = json.dumps(payload).encode("utf-8")
|
if encode_json:
|
||||||
|
payload = json.dumps(payload)
|
||||||
|
payload = payload.encode("utf-8")
|
||||||
|
|
||||||
return (
|
return (
|
||||||
SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
|
SecretBox(prepped_key).encrypt(payload, encoder=Base64Encoder).decode("utf-8")
|
||||||
@ -56,6 +81,27 @@ def decrypt_payload(secret_key, encrypted_data):
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
prepped_key = unhexlify(secret_key)
|
||||||
|
|
||||||
|
decrypted_data = SecretBox(prepped_key).decrypt(
|
||||||
|
encrypted_data, encoder=Base64Encoder
|
||||||
|
)
|
||||||
|
decrypted_data = decrypted_data.decode("utf-8")
|
||||||
|
|
||||||
|
return json.loads(decrypted_data)
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt_payload_legacy(secret_key, encrypted_data):
|
||||||
|
"""Return a decrypted payload given a key and a string of encrypted data."""
|
||||||
|
try:
|
||||||
|
from nacl.encoding import Base64Encoder
|
||||||
|
from nacl.secret import SecretBox
|
||||||
|
except (ImportError, OSError):
|
||||||
|
pytest.skip("libnacl/libsodium is not installed")
|
||||||
|
return
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
keylen = SecretBox.KEY_SIZE
|
keylen = SecretBox.KEY_SIZE
|
||||||
prepped_key = secret_key.encode("utf-8")
|
prepped_key = secret_key.encode("utf-8")
|
||||||
prepped_key = prepped_key[:keylen]
|
prepped_key = prepped_key[:keylen]
|
||||||
@ -273,6 +319,181 @@ async def test_webhook_handle_decryption(webhook_client, create_registrations):
|
|||||||
assert decrypted_data == {"one": "Hello world"}
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_handle_decryption_legacy(webhook_client, create_registrations):
|
||||||
|
"""Test that we can encrypt/decrypt properly."""
|
||||||
|
key = create_registrations[0]["secret"]
|
||||||
|
data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"])
|
||||||
|
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert "encrypted_data" in webhook_json
|
||||||
|
|
||||||
|
decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"])
|
||||||
|
|
||||||
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_handle_decryption_fail(
|
||||||
|
webhook_client, create_registrations, caplog
|
||||||
|
):
|
||||||
|
"""Test that we can encrypt/decrypt properly."""
|
||||||
|
key = create_registrations[0]["secret"]
|
||||||
|
|
||||||
|
# Send valid data
|
||||||
|
data = encrypt_payload(key, RENDER_TEMPLATE["data"])
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"])
|
||||||
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
|
# Send invalid JSON data
|
||||||
|
data = encrypt_payload(key, "{not_valid", encode_json=False)
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert decrypt_payload(key, webhook_json["encrypted_data"]) == {}
|
||||||
|
assert "Ignoring invalid encrypted payload" in caplog.text
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
|
# Break the key, and send JSON data
|
||||||
|
data = encrypt_payload(key[::-1], RENDER_TEMPLATE["data"])
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert decrypt_payload(key, webhook_json["encrypted_data"]) == {}
|
||||||
|
assert "Ignoring encrypted payload because unable to decrypt" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_handle_decryption_legacy_fail(
|
||||||
|
webhook_client, create_registrations, caplog
|
||||||
|
):
|
||||||
|
"""Test that we can encrypt/decrypt properly."""
|
||||||
|
key = create_registrations[0]["secret"]
|
||||||
|
|
||||||
|
# Send valid data using legacy method
|
||||||
|
data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"])
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"])
|
||||||
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
|
# Send invalid JSON data
|
||||||
|
data = encrypt_payload_legacy(key, "{not_valid", encode_json=False)
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {}
|
||||||
|
assert "Ignoring invalid encrypted payload" in caplog.text
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
|
# Break the key, and send JSON data
|
||||||
|
data = encrypt_payload_legacy(key[::-1], RENDER_TEMPLATE["data"])
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {}
|
||||||
|
assert "Ignoring encrypted payload because unable to decrypt" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
async def test_webhook_handle_decryption_legacy_upgrade(
|
||||||
|
webhook_client, create_registrations
|
||||||
|
):
|
||||||
|
"""Test that we can encrypt/decrypt properly."""
|
||||||
|
key = create_registrations[0]["secret"]
|
||||||
|
|
||||||
|
# Send using legacy method
|
||||||
|
data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"])
|
||||||
|
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert "encrypted_data" in webhook_json
|
||||||
|
|
||||||
|
decrypted_data = decrypt_payload_legacy(key, webhook_json["encrypted_data"])
|
||||||
|
|
||||||
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
|
||||||
|
# Send using new method
|
||||||
|
data = encrypt_payload(key, RENDER_TEMPLATE["data"])
|
||||||
|
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert "encrypted_data" in webhook_json
|
||||||
|
|
||||||
|
decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"])
|
||||||
|
|
||||||
|
assert decrypted_data == {"one": "Hello world"}
|
||||||
|
|
||||||
|
# Send using legacy method - no longer possible
|
||||||
|
data = encrypt_payload_legacy(key, RENDER_TEMPLATE["data"])
|
||||||
|
|
||||||
|
container = {"type": "render_template", "encrypted": True, "encrypted_data": data}
|
||||||
|
|
||||||
|
resp = await webhook_client.post(
|
||||||
|
"/api/webhook/{}".format(create_registrations[0]["webhook_id"]), json=container
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status == HTTPStatus.OK
|
||||||
|
|
||||||
|
webhook_json = await resp.json()
|
||||||
|
assert "encrypted_data" in webhook_json
|
||||||
|
|
||||||
|
# The response should be empty, encrypted with the new method
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
decrypt_payload_legacy(key, webhook_json["encrypted_data"])
|
||||||
|
decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"])
|
||||||
|
|
||||||
|
assert decrypted_data == {}
|
||||||
|
|
||||||
|
|
||||||
async def test_webhook_requires_encryption(webhook_client, create_registrations):
|
async def test_webhook_requires_encryption(webhook_client, create_registrations):
|
||||||
"""Test that encrypted registrations only accept encrypted data."""
|
"""Test that encrypted registrations only accept encrypted data."""
|
||||||
resp = await webhook_client.post(
|
resp = await webhook_client.post(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user