mirror of
https://github.com/home-assistant/core.git
synced 2026-04-27 06:45:58 +00:00
Update PyJWT to 2.12.1 (#168239)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Co-authored-by: Robert Resch <robert@resch.dev>
This commit is contained in:
@@ -7,23 +7,31 @@ to speed up the process.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Container, Iterable, Sequence
|
||||
from datetime import timedelta
|
||||
from functools import lru_cache, partial
|
||||
from typing import Any
|
||||
from functools import lru_cache
|
||||
from typing import Any, override
|
||||
|
||||
from jwt import DecodeError, PyJWS, PyJWT
|
||||
from jwt import DecodeError, PyJWK, PyJWS, PyJWT
|
||||
from jwt.algorithms import AllowedPublicKeys
|
||||
from jwt.types import Options
|
||||
|
||||
from homeassistant.util.json import json_loads
|
||||
|
||||
JWT_TOKEN_CACHE_SIZE = 16
|
||||
MAX_TOKEN_SIZE = 8192
|
||||
|
||||
_VERIFY_KEYS = ("signature", "exp", "nbf", "iat", "aud", "iss", "sub", "jti")
|
||||
|
||||
_VERIFY_OPTIONS: dict[str, Any] = {f"verify_{key}": True for key in _VERIFY_KEYS} | {
|
||||
"require": []
|
||||
}
|
||||
_NO_VERIFY_OPTIONS = {f"verify_{key}": False for key in _VERIFY_KEYS}
|
||||
_NO_VERIFY_OPTIONS = Options(
|
||||
verify_signature=False,
|
||||
verify_exp=False,
|
||||
verify_nbf=False,
|
||||
verify_iat=False,
|
||||
verify_aud=False,
|
||||
verify_iss=False,
|
||||
verify_sub=False,
|
||||
verify_jti=False,
|
||||
require=[],
|
||||
)
|
||||
|
||||
|
||||
class _PyJWSWithLoadCache(PyJWS):
|
||||
@@ -38,9 +46,6 @@ class _PyJWSWithLoadCache(PyJWS):
|
||||
return super()._load(jwt)
|
||||
|
||||
|
||||
_jws = _PyJWSWithLoadCache()
|
||||
|
||||
|
||||
@lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)
|
||||
def _decode_payload(json_payload: str) -> dict[str, Any]:
|
||||
"""Decode the payload from a JWS dictionary."""
|
||||
@@ -56,21 +61,12 @@ def _decode_payload(json_payload: str) -> dict[str, Any]:
|
||||
class _PyJWTWithVerify(PyJWT):
|
||||
"""PyJWT with a fast decode implementation."""
|
||||
|
||||
def decode_payload(
|
||||
self, jwt: str, key: str, options: dict[str, Any], algorithms: list[str]
|
||||
) -> dict[str, Any]:
|
||||
"""Decode a JWT's payload."""
|
||||
if len(jwt) > MAX_TOKEN_SIZE:
|
||||
# Avoid caching impossible tokens
|
||||
raise DecodeError("Token too large")
|
||||
return _decode_payload(
|
||||
_jws.decode_complete(
|
||||
jwt=jwt,
|
||||
key=key,
|
||||
algorithms=algorithms,
|
||||
options=options,
|
||||
)["payload"]
|
||||
)
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the PyJWT instance."""
|
||||
# We require exp and iat claims to be present
|
||||
super().__init__(Options(require=["exp", "iat"]))
|
||||
# Override the _jws instance with our cached version
|
||||
self._jws = _PyJWSWithLoadCache()
|
||||
|
||||
def verify_and_decode(
|
||||
self,
|
||||
@@ -79,37 +75,70 @@ class _PyJWTWithVerify(PyJWT):
|
||||
algorithms: list[str],
|
||||
issuer: str | None = None,
|
||||
leeway: float | timedelta = 0,
|
||||
options: dict[str, Any] | None = None,
|
||||
options: Options | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Verify a JWT's signature and claims."""
|
||||
merged_options = {**_VERIFY_OPTIONS, **(options or {})}
|
||||
payload = self.decode_payload(
|
||||
return self.decode(
|
||||
jwt=jwt,
|
||||
key=key,
|
||||
options=merged_options,
|
||||
algorithms=algorithms,
|
||||
)
|
||||
# These should never be missing since we verify them
|
||||
# but this is an additional safeguard to make sure
|
||||
# nothing slips through.
|
||||
assert "exp" in payload, "exp claim is required"
|
||||
assert "iat" in payload, "iat claim is required"
|
||||
self._validate_claims(
|
||||
payload=payload,
|
||||
options=merged_options,
|
||||
issuer=issuer,
|
||||
leeway=leeway,
|
||||
options=options,
|
||||
)
|
||||
return payload
|
||||
|
||||
@override
|
||||
def decode(
|
||||
self,
|
||||
jwt: str | bytes,
|
||||
key: AllowedPublicKeys | PyJWK | str | bytes = "",
|
||||
algorithms: Sequence[str] | None = None,
|
||||
options: Options | None = None,
|
||||
verify: bool | None = None,
|
||||
detached_payload: bytes | None = None,
|
||||
audience: str | Iterable[str] | None = None,
|
||||
subject: str | None = None,
|
||||
issuer: str | Container[str] | None = None,
|
||||
leeway: float | timedelta = 0,
|
||||
**kwargs: Any,
|
||||
) -> dict[str, Any]:
|
||||
"""Decode a JWT, verifying the signature and claims."""
|
||||
if len(jwt) > MAX_TOKEN_SIZE:
|
||||
# Avoid caching impossible tokens
|
||||
raise DecodeError("Token too large")
|
||||
return super().decode(
|
||||
jwt=jwt,
|
||||
key=key,
|
||||
algorithms=algorithms,
|
||||
options=options,
|
||||
verify=verify,
|
||||
detached_payload=detached_payload,
|
||||
audience=audience,
|
||||
subject=subject,
|
||||
issuer=issuer,
|
||||
leeway=leeway,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@override
|
||||
def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]:
|
||||
return _decode_payload(decoded["payload"])
|
||||
|
||||
|
||||
_jwt = _PyJWTWithVerify()
|
||||
verify_and_decode = _jwt.verify_and_decode
|
||||
unverified_hs256_token_decode = lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)(
|
||||
partial(
|
||||
_jwt.decode_payload, key="", algorithms=["HS256"], options=_NO_VERIFY_OPTIONS
|
||||
|
||||
|
||||
@lru_cache(maxsize=JWT_TOKEN_CACHE_SIZE)
|
||||
def unverified_hs256_token_decode(jwt: str) -> dict[str, Any]:
|
||||
"""Decode a JWT without verifying the signature."""
|
||||
return _jwt.decode(
|
||||
jwt=jwt,
|
||||
key="",
|
||||
algorithms=["HS256"],
|
||||
options=_NO_VERIFY_OPTIONS,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"unverified_hs256_token_decode",
|
||||
|
||||
@@ -804,6 +804,6 @@ def _decode_jwt(hass: HomeAssistant, encoded: str) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return jwt.decode(encoded, secret, algorithms=["HS256"]) # type: ignore[no-any-return]
|
||||
return jwt.decode(encoded, secret, algorithms=["HS256"])
|
||||
except jwt.InvalidTokenError:
|
||||
return None
|
||||
|
||||
@@ -53,7 +53,7 @@ paho-mqtt==2.1.0
|
||||
Pillow==12.2.0
|
||||
propcache==0.4.1
|
||||
psutil-home-assistant==0.0.1
|
||||
PyJWT==2.10.1
|
||||
PyJWT==2.12.1
|
||||
pymicro-vad==1.0.1
|
||||
PyNaCl==1.6.2
|
||||
pyOpenSSL==26.0.0
|
||||
|
||||
@@ -55,7 +55,7 @@ dependencies = [
|
||||
"ifaddr==0.2.0",
|
||||
"Jinja2==3.1.6",
|
||||
"lru-dict==1.3.0",
|
||||
"PyJWT==2.10.1",
|
||||
"PyJWT==2.12.1",
|
||||
# PyJWT has loose dependency. We want the latest one.
|
||||
"cryptography==46.0.7",
|
||||
"Pillow==12.2.0",
|
||||
|
||||
2
requirements.txt
generated
2
requirements.txt
generated
@@ -39,7 +39,7 @@ packaging>=23.1
|
||||
Pillow==12.2.0
|
||||
propcache==0.4.1
|
||||
psutil-home-assistant==0.0.1
|
||||
PyJWT==2.10.1
|
||||
PyJWT==2.12.1
|
||||
pymicro-vad==1.0.1
|
||||
pyOpenSSL==26.0.0
|
||||
pyspeex-noise==1.0.2
|
||||
|
||||
@@ -6,12 +6,6 @@ import pytest
|
||||
from homeassistant.auth import jwt_wrapper
|
||||
|
||||
|
||||
async def test_all_default_options_are_in_verify_options() -> None:
|
||||
"""Test that all default options in _VERIFY_OPTIONS."""
|
||||
for option in jwt_wrapper._PyJWTWithVerify._get_default_options():
|
||||
assert option in jwt_wrapper._VERIFY_OPTIONS
|
||||
|
||||
|
||||
async def test_reject_access_token_with_impossible_large_size() -> None:
|
||||
"""Test rejecting access tokens with impossible sizes."""
|
||||
with pytest.raises(jwt.DecodeError):
|
||||
|
||||
Reference in New Issue
Block a user