diff --git a/supervisor/utils/codenotary.py b/supervisor/utils/codenotary.py index bc4f3b817..c08e20f7f 100644 --- a/supervisor/utils/codenotary.py +++ b/supervisor/utils/codenotary.py @@ -21,6 +21,7 @@ _CAS_CMD: str = ( _CACHE: set[tuple[str, str]] = set() +_ATTR_ERROR: Final = "error" _ATTR_STATUS: Final = "status" @@ -88,6 +89,9 @@ async def cas_validate( f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error ) from err + if _ATTR_ERROR in data_json: + raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning) + if data_json[_ATTR_STATUS] == 0: _CACHE.add((checksum, signer)) else: diff --git a/tests/utils/test_codenotary.py b/tests/utils/test_codenotary.py index 68c5ed3b6..68d8617da 100644 --- a/tests/utils/test_codenotary.py +++ b/tests/utils/test_codenotary.py @@ -1,11 +1,51 @@ """Test CodeNotary.""" +from __future__ import annotations + +from dataclasses import dataclass +from unittest.mock import AsyncMock, Mock, patch import pytest -from supervisor.exceptions import CodeNotaryUntrusted +from supervisor.exceptions import ( + CodeNotaryBackendError, + CodeNotaryError, + CodeNotaryUntrusted, +) from supervisor.utils.codenotary import calc_checksum, cas_validate +@dataclass +class SubprocessResponse: + """Class for specifying subprocess exec response.""" + + returncode: int = 0 + data: str = "" + error: str | None = None + exception: Exception | None = None + + +@pytest.fixture(name="subprocess_exec") +def fixture_subprocess_exec(request): + """Mock subprocess exec with specific return.""" + response = request.param + if response.exception: + communicate_return = AsyncMock(side_effect=response.exception) + else: + err_return = None + if response.error: + err_return = Mock(decode=Mock(return_value=response.error)) + + communicate_return = AsyncMock(return_value=(response.data, err_return)) + + exec_return = Mock(returncode=response.returncode, communicate=communicate_return) + + with patch( + "supervisor.utils.codenotary.asyncio.create_subprocess_exec", + return_value=exec_return, + ) as subprocess_exec: + yield subprocess_exec + + def test_checksum_calc(): """Calc Checkusm as test.""" assert calc_checksum("test") == calc_checksum(b"test") @@ -30,3 +70,46 @@ async def test_invalid_checksum(): "notary@home-assistant.io", "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", ) + + +@pytest.mark.parametrize( + "subprocess_exec", + [ + SubprocessResponse(returncode=1, error="test"), + SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'), + ], + indirect=True, +) +async def test_cas_backend_error(subprocess_exec): + """Test backend error executing cas command.""" + with pytest.raises(CodeNotaryBackendError): + await cas_validate( + "notary@home-assistant.io", + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + ) + + +@pytest.mark.parametrize( + "subprocess_exec", + [SubprocessResponse(returncode=0, data='{"status":1}')], + indirect=True, +) +async def test_cas_notarized_untrusted(subprocess_exec): + """Test cas found notarized but untrusted content.""" + with pytest.raises(CodeNotaryUntrusted): + await cas_validate( + "notary@home-assistant.io", + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + ) + + +@pytest.mark.parametrize( + "subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True +) +async def test_cas_exec_os_error(subprocess_exec): + """Test os error attempting to execute cas command.""" + with pytest.raises(CodeNotaryError): + await cas_validate( + "notary@home-assistant.io", + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", + )