mirror of
https://github.com/home-assistant/core.git
synced 2025-07-18 18:57:06 +00:00
Always enforce URL param ordering for signed URLs (#90148)
Always enforce URL param ordering
This commit is contained in:
parent
a7b5a0297e
commit
174342860b
@ -60,9 +60,7 @@ def async_sign_path(
|
|||||||
|
|
||||||
url = URL(path)
|
url = URL(path)
|
||||||
now = dt_util.utcnow()
|
now = dt_util.utcnow()
|
||||||
params = dict(sorted(url.query.items()))
|
params = [itm for itm in url.query.items() if itm[0] not in SAFE_QUERY_PARAMS]
|
||||||
for param in SAFE_QUERY_PARAMS:
|
|
||||||
params.pop(param, None)
|
|
||||||
encoded = jwt.encode(
|
encoded = jwt.encode(
|
||||||
{
|
{
|
||||||
"iss": refresh_token_id,
|
"iss": refresh_token_id,
|
||||||
@ -75,7 +73,7 @@ def async_sign_path(
|
|||||||
algorithm="HS256",
|
algorithm="HS256",
|
||||||
)
|
)
|
||||||
|
|
||||||
params[SIGN_QUERY_PARAM] = encoded
|
params.append((SIGN_QUERY_PARAM, encoded))
|
||||||
url = url.with_query(params)
|
url = url.with_query(params)
|
||||||
return f"{url.path}?{url.query_string}"
|
return f"{url.path}?{url.query_string}"
|
||||||
|
|
||||||
@ -184,10 +182,11 @@ async def async_setup_auth(hass: HomeAssistant, app: Application) -> None:
|
|||||||
if claims["path"] != request.path:
|
if claims["path"] != request.path:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
params = dict(sorted(request.query.items()))
|
params = [
|
||||||
del params[SIGN_QUERY_PARAM]
|
list(itm) # claims stores tuples as lists
|
||||||
for param in SAFE_QUERY_PARAMS:
|
for itm in request.query.items()
|
||||||
params.pop(param, None)
|
if itm[0] not in SAFE_QUERY_PARAMS and itm[0] != SIGN_QUERY_PARAM
|
||||||
|
]
|
||||||
if claims["params"] != params:
|
if claims["params"] != params:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -352,6 +352,12 @@ async def test_auth_access_signed_path_with_query_param(
|
|||||||
data = await req.json()
|
data = await req.json()
|
||||||
assert data["user_id"] == refresh_token.user.id
|
assert data["user_id"] == refresh_token.user.id
|
||||||
|
|
||||||
|
# Without query params not allowed
|
||||||
|
url = yarl.URL(signed_path)
|
||||||
|
signed_path = f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}"
|
||||||
|
req = await client.get(signed_path)
|
||||||
|
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||||
|
|
||||||
|
|
||||||
async def test_auth_access_signed_path_with_query_param_order(
|
async def test_auth_access_signed_path_with_query_param_order(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
@ -374,12 +380,24 @@ async def test_auth_access_signed_path_with_query_param_order(
|
|||||||
refresh_token_id=refresh_token.id,
|
refresh_token_id=refresh_token.id,
|
||||||
)
|
)
|
||||||
url = yarl.URL(signed_path)
|
url = yarl.URL(signed_path)
|
||||||
signed_path = f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&foo=bar&test=test"
|
|
||||||
|
|
||||||
req = await client.get(signed_path)
|
# Change order
|
||||||
assert req.status == HTTPStatus.OK
|
req = await client.get(
|
||||||
data = await req.json()
|
f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&foo=bar&test=test"
|
||||||
assert data["user_id"] == refresh_token.user.id
|
)
|
||||||
|
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||||
|
|
||||||
|
# Duplicate a param
|
||||||
|
req = await client.get(
|
||||||
|
f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&test=test&foo=aaa&foo=bar"
|
||||||
|
)
|
||||||
|
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||||
|
|
||||||
|
# Remove a param
|
||||||
|
req = await client.get(
|
||||||
|
f"{url.path}?{SIGN_QUERY_PARAM}={url.query.get(SIGN_QUERY_PARAM)}&test=test"
|
||||||
|
)
|
||||||
|
assert req.status == HTTPStatus.UNAUTHORIZED
|
||||||
|
|
||||||
|
|
||||||
async def test_auth_access_signed_path_with_query_param_safe_param(
|
async def test_auth_access_signed_path_with_query_param_safe_param(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user