mirror of
https://github.com/home-assistant/core.git
synced 2025-08-11 06:20:01 +00:00
.github
docs
homeassistant
script
tests
auth
components
air_quality
alarm_control_panel
alert
alexa
ambient_station
api
arlo
auth
automation
binary_sensor
calendar
camera
canary
cast
climate
cloud
config
configurator
conversation
counter
cover
daikin
datadog
deconz
default_config
demo
device_sun_light_trigger
device_tracker
dialogflow
discovery
duckdns
dyson
ecobee
emulated_hue
emulated_roku
esphome
fan
feedreader
ffmpeg
folder_watcher
freedns
fritzbox
frontend
geo_location
geofency
google
google_assistant
google_domains
google_pubsub
gpslogger
graphite
group
hangouts
hassio
history
history_graph
homekit
homekit_controller
homematicip_cloud
http
huawei_lte
hue
ifttt
image_processing
influxdb
init
input_boolean
input_datetime
input_number
input_select
input_text
intent_script
introduction
ios
ipma
kira
light
litejet
locative
lock
logbook
logentries
logger
lovelace
luftdaten
mailbox
mailgun
__init__.py
test_init.py
media_player
melissa
microsoft_face
mochad
mqtt
mqtt_eventstream
mqtt_statestream
mythicbeastsdns
namecheapdns
ness_alarm
nest
no_ip
notify
nuheat
onboarding
openuv
owntracks
panel_custom
panel_iframe
persistent_notification
person
pilight
plant
point
prometheus
proximity
ps4
python_script
qwikswitch
rainmachine
recorder
remember_the_milk
remote
rest_command
rflink
rfxtrx
ring
rss_feed_template
scene
script
sensor
shell_command
shopping_list
simplisafe
sleepiq
smartthings
smhi
snips
sonos
spaceapi
spc
splunk
statsd
sun
switch
system_health
system_log
tellduslive
timer
toon
tplink
tradfri
tts
twilio
unifi
updater
upnp
utility_meter
vacuum
verisure
vultr
wake_on_lan
water_heater
weather
webhook
weblink
webostv
websocket_api
xiaomi_miio
zha
zone
zwave
__init__.py
conftest.py
fixtures
helpers
mock
resources
scripts
test_util
testing_config
util
__init__.py
common.py
conftest.py
test_bootstrap.py
test_config.py
test_config_entries.py
test_core.py
test_data_entry_flow.py
test_loader.py
test_main.py
test_requirements.py
test_setup.py
virtualization
.coveragerc
.dockerignore
.gitattributes
.gitignore
.hound.yml
.ignore
.readthedocs.yml
.travis.yml
CLA.md
CODEOWNERS
CODE_OF_CONDUCT.md
CONTRIBUTING.md
Dockerfile
LICENSE.md
MANIFEST.in
README.rst
mypy.ini
pylintrc
requirements_all.txt
requirements_docs.txt
requirements_test.txt
requirements_test_all.txt
setup.cfg
setup.py
tox.ini

* Switch mailgun webhooks to the webhook api * Change mailgun strings to indicate application/json is in use * Lint * Revert Changes to .translations. * Don't fail if the API key isn't set
232 lines
6.8 KiB
Python
232 lines
6.8 KiB
Python
"""Test the init file of Mailgun."""
|
|
import hashlib
|
|
import hmac
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
|
|
from homeassistant import data_entry_flow
|
|
from homeassistant.components import mailgun, webhook
|
|
from homeassistant.const import CONF_API_KEY, CONF_DOMAIN
|
|
from homeassistant.core import callback
|
|
from homeassistant.setup import async_setup_component
|
|
|
|
API_KEY = 'abc123'
|
|
|
|
|
|
@pytest.fixture
|
|
async def http_client(hass, aiohttp_client):
|
|
"""Initialize a Home Assistant Server for testing this module."""
|
|
await async_setup_component(hass, webhook.DOMAIN, {})
|
|
return await aiohttp_client(hass.http.app)
|
|
|
|
|
|
@pytest.fixture
|
|
async def webhook_id_with_api_key(hass):
|
|
"""Initialize the Mailgun component and get the webhook_id."""
|
|
await async_setup_component(hass, mailgun.DOMAIN, {
|
|
mailgun.DOMAIN: {
|
|
CONF_API_KEY: API_KEY,
|
|
CONF_DOMAIN: 'example.com'
|
|
},
|
|
})
|
|
|
|
hass.config.api = Mock(base_url='http://example.com')
|
|
result = await hass.config_entries.flow.async_init('mailgun', context={
|
|
'source': 'user'
|
|
})
|
|
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM, result
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result['flow_id'], {})
|
|
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
|
|
return result['result'].data['webhook_id']
|
|
|
|
|
|
@pytest.fixture
|
|
async def webhook_id_without_api_key(hass):
|
|
"""Initialize the Mailgun component and get the webhook_id w/o API key."""
|
|
await async_setup_component(hass, mailgun.DOMAIN, {})
|
|
|
|
hass.config.api = Mock(base_url='http://example.com')
|
|
result = await hass.config_entries.flow.async_init('mailgun', context={
|
|
'source': 'user'
|
|
})
|
|
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM, result
|
|
|
|
result = await hass.config_entries.flow.async_configure(
|
|
result['flow_id'], {})
|
|
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
|
|
return result['result'].data['webhook_id']
|
|
|
|
|
|
@pytest.fixture
|
|
async def mailgun_events(hass):
|
|
"""Return a list of mailgun_events triggered."""
|
|
events = []
|
|
|
|
@callback
|
|
def handle_event(event):
|
|
"""Handle Mailgun event."""
|
|
events.append(event)
|
|
|
|
hass.bus.async_listen(mailgun.MESSAGE_RECEIVED, handle_event)
|
|
|
|
return events
|
|
|
|
|
|
async def test_mailgun_webhook_with_missing_signature(
|
|
http_client,
|
|
webhook_id_with_api_key,
|
|
mailgun_events
|
|
):
|
|
"""Test that webhook doesn't trigger an event without a signature."""
|
|
event_count = len(mailgun_events)
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_with_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
'signature': {}
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_with_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count
|
|
|
|
|
|
async def test_mailgun_webhook_with_different_api_key(
|
|
http_client,
|
|
webhook_id_with_api_key,
|
|
mailgun_events
|
|
):
|
|
"""Test that webhook doesn't trigger an event with a wrong signature."""
|
|
timestamp = '1529006854'
|
|
token = 'a8ce0edb2dd8301dee6c2405235584e45aa91d1e9f979f3de0'
|
|
|
|
event_count = len(mailgun_events)
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_with_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
'signature': {
|
|
'signature': hmac.new(
|
|
key=b'random_api_key',
|
|
msg=bytes('{}{}'.format(timestamp, token), 'utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).hexdigest(),
|
|
'timestamp': timestamp,
|
|
'token': token
|
|
}
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count
|
|
|
|
|
|
async def test_mailgun_webhook_event_with_correct_api_key(
|
|
http_client,
|
|
webhook_id_with_api_key,
|
|
mailgun_events
|
|
):
|
|
"""Test that webhook triggers an event after validating a signature."""
|
|
timestamp = '1529006854'
|
|
token = 'a8ce0edb2dd8301dee6c2405235584e45aa91d1e9f979f3de0'
|
|
|
|
event_count = len(mailgun_events)
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_with_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
'signature': {
|
|
'signature': hmac.new(
|
|
key=bytes(API_KEY, 'utf-8'),
|
|
msg=bytes('{}{}'.format(timestamp, token), 'utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).hexdigest(),
|
|
'timestamp': timestamp,
|
|
'token': token
|
|
}
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count + 1
|
|
assert mailgun_events[-1].data['webhook_id'] == webhook_id_with_api_key
|
|
assert mailgun_events[-1].data['hello'] == 'mailgun'
|
|
|
|
|
|
async def test_mailgun_webhook_with_missing_signature_without_api_key(
|
|
http_client,
|
|
webhook_id_without_api_key,
|
|
mailgun_events
|
|
):
|
|
"""Test that webhook triggers an event without a signature w/o API key."""
|
|
event_count = len(mailgun_events)
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_without_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
'signature': {}
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count + 1
|
|
assert mailgun_events[-1].data['webhook_id'] == webhook_id_without_api_key
|
|
assert mailgun_events[-1].data['hello'] == 'mailgun'
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_without_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count + 1
|
|
assert mailgun_events[-1].data['webhook_id'] == webhook_id_without_api_key
|
|
assert mailgun_events[-1].data['hello'] == 'mailgun'
|
|
|
|
|
|
async def test_mailgun_webhook_event_without_an_api_key(
|
|
http_client,
|
|
webhook_id_without_api_key,
|
|
mailgun_events
|
|
):
|
|
"""Test that webhook triggers an event if there is no api key."""
|
|
timestamp = '1529006854'
|
|
token = 'a8ce0edb2dd8301dee6c2405235584e45aa91d1e9f979f3de0'
|
|
|
|
event_count = len(mailgun_events)
|
|
|
|
await http_client.post(
|
|
'/api/webhook/{}'.format(webhook_id_without_api_key),
|
|
json={
|
|
'hello': 'mailgun',
|
|
'signature': {
|
|
'signature': hmac.new(
|
|
key=bytes(API_KEY, 'utf-8'),
|
|
msg=bytes('{}{}'.format(timestamp, token), 'utf-8'),
|
|
digestmod=hashlib.sha256
|
|
).hexdigest(),
|
|
'timestamp': timestamp,
|
|
'token': token
|
|
}
|
|
}
|
|
)
|
|
|
|
assert len(mailgun_events) == event_count + 1
|
|
assert mailgun_events[-1].data['webhook_id'] == webhook_id_without_api_key
|
|
assert mailgun_events[-1].data['hello'] == 'mailgun'
|