diff --git a/homeassistant/components/http/auth.py b/homeassistant/components/http/auth.py index 5213cd1b072..d10bd677e41 100644 --- a/homeassistant/components/http/auth.py +++ b/homeassistant/components/http/auth.py @@ -60,9 +60,7 @@ def async_sign_path( url = URL(path) now = dt_util.utcnow() - params = dict(sorted(url.query.items())) - for param in SAFE_QUERY_PARAMS: - params.pop(param, None) + params = [itm for itm in url.query.items() if itm[0] not in SAFE_QUERY_PARAMS] encoded = jwt.encode( { "iss": refresh_token_id, @@ -75,7 +73,7 @@ def async_sign_path( algorithm="HS256", ) - params[SIGN_QUERY_PARAM] = encoded + params.append((SIGN_QUERY_PARAM, encoded)) url = url.with_query(params) return f"{url.path}?{url.query_string}" @@ -184,10 +182,11 @@ async def async_setup_auth(hass: HomeAssistant, app: Application) -> None: if claims["path"] != request.path: return False - params = dict(sorted(request.query.items())) - del params[SIGN_QUERY_PARAM] - for param in SAFE_QUERY_PARAMS: - params.pop(param, None) + params = [ + list(itm) # claims stores tuples as lists + for itm in request.query.items() + if itm[0] not in SAFE_QUERY_PARAMS and itm[0] != SIGN_QUERY_PARAM + ] if claims["params"] != params: return False diff --git a/tests/components/http/test_auth.py b/tests/components/http/test_auth.py index fb00640cdc5..246572e64f8 100644 --- a/tests/components/http/test_auth.py +++ b/tests/components/http/test_auth.py @@ -352,6 +352,12 @@ async def test_auth_access_signed_path_with_query_param( data = await req.json() assert data["user_id"] == refresh_token.user.id + # Without query params not allowed + url = yarl.URL(signed_path) + signed_path = f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}" + req = await client.get(signed_path) + assert req.status == HTTPStatus.UNAUTHORIZED + async def test_auth_access_signed_path_with_query_param_order( hass: HomeAssistant, @@ -374,12 +380,24 @@ async def test_auth_access_signed_path_with_query_param_order( refresh_token_id=refresh_token.id, ) url = yarl.URL(signed_path) - signed_path = f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&foo=bar&test=test" - req = await client.get(signed_path) - assert req.status == HTTPStatus.OK - data = await req.json() - assert data["user_id"] == refresh_token.user.id + # Change order + req = await client.get( + f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&foo=bar&test=test" + ) + assert req.status == HTTPStatus.UNAUTHORIZED + + # Duplicate a param + req = await client.get( + f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&test=test&foo=aaa&foo=bar" + ) + assert req.status == HTTPStatus.UNAUTHORIZED + + # Remove a param + req = await client.get( + f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&test=test" + ) + assert req.status == HTTPStatus.UNAUTHORIZED async def test_auth_access_signed_path_with_query_param_safe_param(