diff --git a/homeassistant/components/html5/notify.py b/homeassistant/components/html5/notify.py
index 2fcaa266b85..5ab2b39baed 100644
--- a/homeassistant/components/html5/notify.py
+++ b/homeassistant/components/html5/notify.py
@@ -1,5 +1,6 @@
"""HTML5 Push Messaging notification service."""
-import datetime
+from datetime import datetime, timedelta
+
from functools import partial
import json
import logging
@@ -39,14 +40,12 @@ ATTR_VAPID_EMAIL = 'vapid_email'
def gcm_api_deprecated(value):
"""Warn user that GCM API config is deprecated."""
- if not value:
- return value
-
- _LOGGER.warning(
- "Configuring html5_push_notifications via the GCM api"
- " has been deprecated and will stop working after April 11,"
- " 2019. Use the VAPID configuration instead. For instructions,"
- " see https://www.home-assistant.io/components/notify.html5/")
+ if value:
+ _LOGGER.warning(
+ "Configuring html5_push_notifications via the GCM api"
+ " has been deprecated and will stop working after April 11,"
+ " 2019. Use the VAPID configuration instead. For instructions,"
+ " see https://www.home-assistant.io/components/notify.html5/")
return value
@@ -75,6 +74,10 @@ ATTR_ACTIONS = 'actions'
ATTR_TYPE = 'type'
ATTR_URL = 'url'
ATTR_DISMISS = 'dismiss'
+ATTR_PRIORITY = 'priority'
+DEFAULT_PRIORITY = 'normal'
+ATTR_TTL = 'ttl'
+DEFAULT_TTL = 86400
ATTR_JWT = 'jwt'
@@ -193,7 +196,6 @@ class HTML5PushRegistrationView(HomeAssistantView):
data = await request.json()
except ValueError:
return self.json_message('Invalid JSON', HTTP_BAD_REQUEST)
-
try:
data = REGISTER_SCHEMA(data)
except vol.Invalid as ex:
@@ -373,7 +375,7 @@ class HTML5NotificationService(BaseNotificationService):
"""Initialize the service."""
self._gcm_key = gcm_key
self._vapid_prv = vapid_prv
- self._vapid_claims = {"sub": "mailto:{}".format(vapid_email)}
+ self._vapid_email = vapid_email
self.registrations = registrations
self.registrations_json_path = json_path
@@ -425,7 +427,6 @@ class HTML5NotificationService(BaseNotificationService):
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
tag = str(uuid.uuid4())
-
payload = {
'badge': '/static/images/notification-badge.png',
'body': message,
@@ -459,13 +460,14 @@ class HTML5NotificationService(BaseNotificationService):
def _push_message(self, payload, **kwargs):
"""Send the message."""
- import jwt
- from pywebpush import WebPusher, webpush
+ from pywebpush import WebPusher
timestamp = int(time.time())
-
+ ttl = int(kwargs.get(ATTR_TTL, DEFAULT_TTL))
+ priority = kwargs.get(ATTR_PRIORITY, DEFAULT_PRIORITY)
+ if priority not in ['normal', 'high']:
+ priority = DEFAULT_PRIORITY
payload['timestamp'] = (timestamp*1000) # Javascript ms since epoch
-
targets = kwargs.get(ATTR_TARGET)
if not targets:
@@ -473,26 +475,37 @@ class HTML5NotificationService(BaseNotificationService):
for target in list(targets):
info = self.registrations.get(target)
- if info is None:
+ try:
+ info = REGISTER_SCHEMA(info)
+ except vol.Invalid:
_LOGGER.error("%s is not a valid HTML5 push notification"
" target", target)
continue
-
- jwt_exp = (datetime.datetime.fromtimestamp(timestamp) +
- datetime.timedelta(days=JWT_VALID_DAYS))
+ payload[ATTR_DATA][ATTR_JWT] = add_jwt(
+ timestamp, target, payload[ATTR_TAG],
+ info[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH])
+ import jwt
jwt_secret = info[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH]
+ jwt_exp = (datetime.fromtimestamp(timestamp) +
+ timedelta(days=JWT_VALID_DAYS))
jwt_claims = {'exp': jwt_exp, 'nbf': timestamp,
'iat': timestamp, ATTR_TARGET: target,
ATTR_TAG: payload[ATTR_TAG]}
jwt_token = jwt.encode(jwt_claims, jwt_secret).decode('utf-8')
payload[ATTR_DATA][ATTR_JWT] = jwt_token
-
- if self._vapid_prv and self._vapid_claims:
- response = webpush(
- info[ATTR_SUBSCRIPTION],
- json.dumps(payload),
- vapid_private_key=self._vapid_prv,
- vapid_claims=self._vapid_claims
+ webpusher = WebPusher(info[ATTR_SUBSCRIPTION])
+ if self._vapid_prv and self._vapid_email:
+ vapid_headers = create_vapid_headers(
+ self._vapid_email, info[ATTR_SUBSCRIPTION],
+ self._vapid_prv)
+ vapid_headers.update({
+ 'urgency': priority,
+ 'priority': priority
+ })
+ response = webpusher.send(
+ data=json.dumps(payload),
+ headers=vapid_headers,
+ ttl=ttl
)
else:
# Only pass the gcm key if we're actually using GCM
@@ -501,8 +514,8 @@ class HTML5NotificationService(BaseNotificationService):
if 'googleapis.com' \
in info[ATTR_SUBSCRIPTION][ATTR_ENDPOINT] \
else None
- response = WebPusher(info[ATTR_SUBSCRIPTION]).send(
- json.dumps(payload), gcm_key=gcm_key, ttl='86400'
+ response = webpusher.send(
+ json.dumps(payload), gcm_key=gcm_key, ttl=ttl
)
if response.status_code == 410:
@@ -514,3 +527,33 @@ class HTML5NotificationService(BaseNotificationService):
_LOGGER.error("Error saving registration")
else:
_LOGGER.info("Configuration saved")
+
+
+def add_jwt(timestamp, target, tag, jwt_secret):
+ """Create JWT json to put into payload."""
+ import jwt
+ jwt_exp = (datetime.fromtimestamp(timestamp) +
+ timedelta(days=JWT_VALID_DAYS))
+ jwt_claims = {'exp': jwt_exp, 'nbf': timestamp,
+ 'iat': timestamp, ATTR_TARGET: target,
+ ATTR_TAG: tag}
+ return jwt.encode(jwt_claims, jwt_secret).decode('utf-8')
+
+
+def create_vapid_headers(vapid_email, subscription_info, vapid_private_key):
+ """Create encrypted headers to send to WebPusher."""
+ from py_vapid import Vapid
+ try:
+ from urllib.parse import urlparse
+ except ImportError: # pragma: no cover
+ from urlparse import urlparse
+ if (vapid_email and vapid_private_key and
+ ATTR_ENDPOINT in subscription_info):
+ url = urlparse(subscription_info.get(ATTR_ENDPOINT))
+ vapid_claims = {
+ 'sub': 'mailto:{}'.format(vapid_email),
+ 'aud': "{}://{}".format(url.scheme, url.netloc)
+ }
+ vapid = Vapid.from_string(private_key=vapid_private_key)
+ return vapid.sign(vapid_claims)
+ return None
diff --git a/tests/components/html5/test_notify.py b/tests/components/html5/test_notify.py
index cae4db6434a..82fd2c546cc 100644
--- a/tests/components/html5/test_notify.py
+++ b/tests/components/html5/test_notify.py
@@ -9,6 +9,14 @@ import homeassistant.components.html5.notify as html5
CONFIG_FILE = 'file.conf'
+VAPID_CONF = {
+ 'vapid_pub_key': 'BJMA2gDZEkHaXRhf1fhY_' +
+ 'QbKbhVIHlSJXI0bFyo0eJXnUPOjdgycCAbj-2bMKMKNKs' +
+ '_rM8JoSnyKGCXAY2dbONI',
+ 'vapid_prv_key': 'ZwPgwKpESGuGLMZYU39vKgrekrWzCijo-LsBM3CZ9-c',
+ 'vapid_email': 'someone@example.com'
+}
+
SUBSCRIPTION_1 = {
'browser': 'chrome',
'subscription': {
@@ -45,6 +53,15 @@ SUBSCRIPTION_4 = {
},
}
+SUBSCRIPTION_5 = {
+ 'browser': 'chrome',
+ 'subscription': {
+ 'endpoint': 'https://fcm.googleapis.com/fcm/send/LONG-RANDOM-KEY',
+ 'expirationTime': None,
+ 'keys': {'auth': 'auth', 'p256dh': 'p256dh'}
+ },
+}
+
REGISTER_URL = '/api/notify.html5'
PUBLISH_URL = '/api/notify.html5/callback'
@@ -185,6 +202,122 @@ class TestHtml5Notify:
assert mock_wp.mock_calls[1][2]['gcm_key'] is not None
assert mock_wp.mock_calls[4][2]['gcm_key'] is None
+ @patch('pywebpush.WebPusher')
+ def test_fcm_key_include(self, mock_wp):
+ """Test if the FCM header is included."""
+ hass = MagicMock()
+
+ data = {
+ 'chrome': SUBSCRIPTION_5
+ }
+
+ m = mock_open(read_data=json.dumps(data))
+ with patch('homeassistant.util.json.open', m, create=True):
+ service = html5.get_service(hass, VAPID_CONF)
+
+ assert service is not None
+
+ service.send_message('Hello', target=['chrome'])
+
+ assert len(mock_wp.mock_calls) == 3
+ # WebPusher constructor
+ assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
+
+ # Third mock_call checks the status_code of the response.
+ assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
+
+ # Get the keys passed to the WebPusher's send method
+ assert mock_wp.mock_calls[1][2]['headers']['Authorization'] is not None
+
+ @patch('pywebpush.WebPusher')
+ def test_fcm_send_with_unknown_priority(self, mock_wp):
+ """Test if the gcm_key is only included for GCM endpoints."""
+ hass = MagicMock()
+
+ data = {
+ 'chrome': SUBSCRIPTION_5
+ }
+
+ m = mock_open(read_data=json.dumps(data))
+ with patch('homeassistant.util.json.open', m, create=True):
+ service = html5.get_service(hass, VAPID_CONF)
+
+ assert service is not None
+
+ service.send_message('Hello', target=['chrome'], priority='undefined')
+
+ assert len(mock_wp.mock_calls) == 3
+ # WebPusher constructor
+ assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
+
+ # Third mock_call checks the status_code of the response.
+ assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
+
+ # Get the keys passed to the WebPusher's send method
+ assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
+
+ @patch('pywebpush.WebPusher')
+ def test_fcm_no_targets(self, mock_wp):
+ """Test if the gcm_key is only included for GCM endpoints."""
+ hass = MagicMock()
+
+ data = {
+ 'chrome': SUBSCRIPTION_5
+ }
+
+ m = mock_open(read_data=json.dumps(data))
+ with patch('homeassistant.util.json.open', m, create=True):
+ service = html5.get_service(hass, VAPID_CONF)
+
+ assert service is not None
+
+ service.send_message('Hello', )
+
+ assert len(mock_wp.mock_calls) == 3
+ # WebPusher constructor
+ assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
+
+ # Third mock_call checks the status_code of the response.
+ assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
+
+ # Get the keys passed to the WebPusher's send method
+ assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
+
+ @patch('pywebpush.WebPusher')
+ def test_fcm_additional_data(self, mock_wp):
+ """Test if the gcm_key is only included for GCM endpoints."""
+ hass = MagicMock()
+
+ data = {
+ 'chrome': SUBSCRIPTION_5
+ }
+
+ m = mock_open(read_data=json.dumps(data))
+ with patch('homeassistant.util.json.open', m, create=True):
+ service = html5.get_service(hass, VAPID_CONF)
+
+ assert service is not None
+
+ service.send_message('Hello', data={'mykey': 'myvalue'})
+
+ assert len(mock_wp.mock_calls) == 3
+ # WebPusher constructor
+ assert mock_wp.mock_calls[0][1][0] == SUBSCRIPTION_5['subscription']
+
+ # Third mock_call checks the status_code of the response.
+ assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'
+
+ # Get the keys passed to the WebPusher's send method
+ assert mock_wp.mock_calls[1][2]['headers']['priority'] == 'normal'
+
+
+def test_create_vapid_withoutvapid():
+ """Test creating empty vapid."""
+ resp = html5.create_vapid_headers(vapid_email=None,
+ vapid_private_key=None,
+ subscription_info=None)
+ assert resp is None
+
async def test_registering_new_device_view(hass, hass_client):
"""Test that the HTML view works."""
@@ -428,3 +561,25 @@ async def test_callback_view_with_jwt(hass, hass_client):
assert resp.status == 200
body = await resp.json()
assert body == {"event": "push", "status": "ok"}
+
+
+async def test_send_fcm_without_targets(hass, hass_client):
+ """Test that the notification is send with FCM without targets."""
+ registrations = {
+ 'device': SUBSCRIPTION_5
+ }
+ await mock_client(hass, hass_client, registrations)
+ with patch('pywebpush.WebPusher') as mock_wp:
+ await hass.services.async_call('notify', 'notify', {
+ 'message': 'Hello',
+ 'target': ['device'],
+ 'data': {'icon': 'beer.png'}
+ }, blocking=True)
+
+ assert len(mock_wp.mock_calls) == 3
+
+ # WebPusher constructor
+ assert mock_wp.mock_calls[0][1][0] == \
+ SUBSCRIPTION_5['subscription']
+ # Third mock_call checks the status_code of the response.
+ assert mock_wp.mock_calls[2][0] == '().send().status_code.__eq__'