Hassio auth (#17274)

* Create auth.py

* Update auth.py

* Update auth.py

* Update __init__.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Update auth.py

* Add tests

* Update test_auth.py

* Update auth.py

* Update test_auth.py

* Update auth.py
This commit is contained in:
Pascal Vizeli 2018-10-10 14:07:51 +02:00 committed by Paulus Schoutsen
parent ad4d5666fe
commit 40e0966d7f
3 changed files with 174 additions and 0 deletions

View File

@ -19,6 +19,7 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.loader import bind_hass
from homeassistant.util.dt import utcnow
from .auth import async_setup_auth
from .handler import HassIO, HassioAPIError
from .discovery import async_setup_discovery
from .http import HassIOView
@ -280,4 +281,7 @@ async def async_setup(hass, config):
# Init discovery Hass.io feature
async_setup_discovery(hass, hassio, config)
# Init auth Hass.io feature
async_setup_auth(hass)
return True

View File

@ -0,0 +1,75 @@
"""Implement the auth feature from Hass.io for Add-ons."""
import logging
from ipaddress import ip_address
import os
from aiohttp import web
from aiohttp.web_exceptions import (
HTTPForbidden, HTTPNotFound, HTTPUnauthorized)
import voluptuous as vol
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import HomeAssistantError
from homeassistant.components.http import HomeAssistantView
from homeassistant.components.http.const import KEY_REAL_IP
from homeassistant.components.http.data_validator import RequestDataValidator
_LOGGER = logging.getLogger(__name__)
ATTR_USERNAME = 'username'
ATTR_PASSWORD = 'password'
SCHEMA_API_AUTH = vol.Schema({
vol.Required(ATTR_USERNAME): cv.string,
vol.Required(ATTR_PASSWORD): cv.string,
})
@callback
def async_setup_auth(hass):
"""Auth setup."""
hassio_auth = HassIOAuth(hass)
hass.http.register_view(hassio_auth)
class HassIOAuth(HomeAssistantView):
"""Hass.io view to handle base part."""
name = "api:hassio_auth"
url = "/api/hassio_auth"
def __init__(self, hass):
"""Initialize WebView."""
self.hass = hass
@RequestDataValidator(SCHEMA_API_AUTH)
async def post(self, request, data):
"""Handle new discovery requests."""
hassio_ip = os.environ['HASSIO'].split(':')[0]
if request[KEY_REAL_IP] != ip_address(hassio_ip):
_LOGGER.error(
"Invalid auth request from %s", request[KEY_REAL_IP])
raise HTTPForbidden()
await self._check_login(data[ATTR_USERNAME], data[ATTR_PASSWORD])
return web.Response(status=200)
def _get_provider(self):
"""Return Homeassistant auth provider."""
for prv in self.hass.auth.auth_providers:
if prv.type == 'homeassistant':
return prv
_LOGGER.error("Can't find Home Assistant auth.")
raise HTTPNotFound()
async def _check_login(self, username, password):
"""Check User credentials."""
provider = self._get_provider()
try:
await provider.async_validate_login(username, password)
except HomeAssistantError:
raise HTTPUnauthorized() from None

View File

@ -0,0 +1,95 @@
"""The tests for the hassio component."""
from unittest.mock import patch, Mock
from homeassistant.const import HTTP_HEADER_HA_AUTH
from homeassistant.exceptions import HomeAssistantError
from tests.common import mock_coro, register_auth_provider
from . import API_PASSWORD
async def test_login_success(hass, hassio_client):
"""Test no auth needed for ."""
await register_auth_provider(hass, {'type': 'homeassistant'})
with patch('homeassistant.auth.providers.homeassistant.'
'HassAuthProvider.async_validate_login',
Mock(return_value=mock_coro())) as mock_login:
resp = await hassio_client.post(
'/api/hassio_auth',
json={
"username": "test",
"password": "123456"
},
headers={
HTTP_HEADER_HA_AUTH: API_PASSWORD
}
)
# Check we got right response
assert resp.status == 200
mock_login.assert_called_with("test", "123456")
async def test_login_error(hass, hassio_client):
"""Test no auth needed for error."""
await register_auth_provider(hass, {'type': 'homeassistant'})
with patch('homeassistant.auth.providers.homeassistant.'
'HassAuthProvider.async_validate_login',
Mock(side_effect=HomeAssistantError())) as mock_login:
resp = await hassio_client.post(
'/api/hassio_auth',
json={
"username": "test",
"password": "123456"
},
headers={
HTTP_HEADER_HA_AUTH: API_PASSWORD
}
)
# Check we got right response
assert resp.status == 401
mock_login.assert_called_with("test", "123456")
async def test_login_no_data(hass, hassio_client):
"""Test auth with no data -> error."""
await register_auth_provider(hass, {'type': 'homeassistant'})
with patch('homeassistant.auth.providers.homeassistant.'
'HassAuthProvider.async_validate_login',
Mock(side_effect=HomeAssistantError())) as mock_login:
resp = await hassio_client.post(
'/api/hassio_auth',
headers={
HTTP_HEADER_HA_AUTH: API_PASSWORD
}
)
# Check we got right response
assert resp.status == 400
assert not mock_login.called
async def test_login_no_username(hass, hassio_client):
"""Test auth with no username in data -> error."""
await register_auth_provider(hass, {'type': 'homeassistant'})
with patch('homeassistant.auth.providers.homeassistant.'
'HassAuthProvider.async_validate_login',
Mock(side_effect=HomeAssistantError())) as mock_login:
resp = await hassio_client.post(
'/api/hassio_auth',
json={
"password": "123456"
},
headers={
HTTP_HEADER_HA_AUTH: API_PASSWORD
}
)
# Check we got right response
assert resp.status == 400
assert not mock_login.called