Add signing support to Tesla Fleet (#128407)

* Add command signing

* wip

* Update tests

* requirements

* Add test
This commit is contained in:
Brett Adams 2024-11-06 05:04:55 +10:00 committed by GitHub
parent 83a1b06b56
commit 94db78a0be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 85 additions and 54 deletions

View File

@ -5,7 +5,12 @@ from typing import Final
from aiohttp.client_exceptions import ClientResponseError from aiohttp.client_exceptions import ClientResponseError
import jwt import jwt
from tesla_fleet_api import EnergySpecific, TeslaFleetApi, VehicleSpecific from tesla_fleet_api import (
EnergySpecific,
TeslaFleetApi,
VehicleSigned,
VehicleSpecific,
)
from tesla_fleet_api.const import Scope from tesla_fleet_api.const import Scope
from tesla_fleet_api.exceptions import ( from tesla_fleet_api.exceptions import (
InvalidRegion, InvalidRegion,
@ -126,7 +131,13 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslaFleetConfigEntry) -
# Remove the protobuff 'cached_data' that we do not use to save memory # Remove the protobuff 'cached_data' that we do not use to save memory
product.pop("cached_data", None) product.pop("cached_data", None)
vin = product["vin"] vin = product["vin"]
api = VehicleSpecific(tesla.vehicle, vin) signing = product["command_signing"] == "required"
if signing:
if not tesla.private_key:
await tesla.get_private_key("config/tesla_fleet.key")
api = VehicleSigned(tesla.vehicle, vin)
else:
api = VehicleSpecific(tesla.vehicle, vin)
coordinator = TeslaFleetVehicleDataCoordinator(hass, api, product) coordinator = TeslaFleetVehicleDataCoordinator(hass, api, product)
await coordinator.async_config_entry_first_refresh() await coordinator.async_config_entry_first_refresh()
@ -145,7 +156,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslaFleetConfigEntry) -
coordinator=coordinator, coordinator=coordinator,
vin=vin, vin=vin,
device=device, device=device,
signing=product["command_signing"] == "required", signing=signing,
) )
) )
elif "energy_site_id" in product and hasattr(tesla, "energy"): elif "energy_site_id" in product and hasattr(tesla, "energy"):

View File

@ -70,8 +70,6 @@ async def async_setup_entry(
for vehicle in entry.runtime_data.vehicles for vehicle in entry.runtime_data.vehicles
for description in DESCRIPTIONS for description in DESCRIPTIONS
if Scope.VEHICLE_CMDS in entry.runtime_data.scopes if Scope.VEHICLE_CMDS in entry.runtime_data.scopes
and (not vehicle.signing or description.key == "wake")
# Wake doesn't need signing
) )

View File

@ -84,7 +84,7 @@ class TeslaFleetClimateEntity(TeslaFleetVehicleEntity, ClimateEntity):
) -> None: ) -> None:
"""Initialize the climate.""" """Initialize the climate."""
self.read_only = Scope.VEHICLE_CMDS not in scopes or data.signing self.read_only = Scope.VEHICLE_CMDS not in scopes
if self.read_only: if self.read_only:
self._attr_supported_features = ClimateEntityFeature(0) self._attr_supported_features = ClimateEntityFeature(0)
@ -231,7 +231,7 @@ class TeslaFleetCabinOverheatProtectionEntity(TeslaFleetVehicleEntity, ClimateEn
"""Initialize the cabin overheat climate entity.""" """Initialize the cabin overheat climate entity."""
# Scopes # Scopes
self.read_only = Scope.VEHICLE_CMDS not in scopes or data.signing self.read_only = Scope.VEHICLE_CMDS not in scopes
# Supported Features # Supported Features
if self.read_only: if self.read_only:

View File

@ -57,7 +57,7 @@ class TeslaFleetWindowEntity(TeslaFleetVehicleEntity, CoverEntity):
self._attr_supported_features = ( self._attr_supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
) )
if not self.scoped or self.vehicle.signing: if not self.scoped:
self._attr_supported_features = CoverEntityFeature(0) self._attr_supported_features = CoverEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:
@ -111,7 +111,7 @@ class TeslaFleetChargePortEntity(TeslaFleetVehicleEntity, CoverEntity):
self._attr_supported_features = ( self._attr_supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
) )
if not self.scoped or self.vehicle.signing: if not self.scoped:
self._attr_supported_features = CoverEntityFeature(0) self._attr_supported_features = CoverEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:
@ -144,7 +144,7 @@ class TeslaFleetFrontTrunkEntity(TeslaFleetVehicleEntity, CoverEntity):
self.scoped = Scope.VEHICLE_CMDS in scopes self.scoped = Scope.VEHICLE_CMDS in scopes
self._attr_supported_features = CoverEntityFeature.OPEN self._attr_supported_features = CoverEntityFeature.OPEN
if not self.scoped or self.vehicle.signing: if not self.scoped:
self._attr_supported_features = CoverEntityFeature(0) self._attr_supported_features = CoverEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:
@ -172,7 +172,7 @@ class TeslaFleetRearTrunkEntity(TeslaFleetVehicleEntity, CoverEntity):
self._attr_supported_features = ( self._attr_supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
) )
if not self.scoped or self.vehicle.signing: if not self.scoped:
self._attr_supported_features = CoverEntityFeature(0) self._attr_supported_features = CoverEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:
@ -216,7 +216,7 @@ class TeslaFleetSunroofEntity(TeslaFleetVehicleEntity, CoverEntity):
super().__init__(vehicle, "vehicle_state_sun_roof_state") super().__init__(vehicle, "vehicle_state_sun_roof_state")
self.scoped = Scope.VEHICLE_CMDS in scopes self.scoped = Scope.VEHICLE_CMDS in scopes
if not self.scoped or self.vehicle.signing: if not self.scoped:
self._attr_supported_features = CoverEntityFeature(0) self._attr_supported_features = CoverEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:

View File

@ -123,14 +123,6 @@ class TeslaFleetVehicleEntity(TeslaFleetEntity):
"""Wake up the vehicle if its asleep.""" """Wake up the vehicle if its asleep."""
await wake_up_vehicle(self.vehicle) await wake_up_vehicle(self.vehicle)
def raise_for_read_only(self, scope: Scope) -> None:
"""Raise an error if no command signing or a scope is not available."""
if self.vehicle.signing:
raise ServiceValidationError(
translation_domain=DOMAIN, translation_key="command_signing"
)
super().raise_for_read_only(scope)
class TeslaFleetEnergyLiveEntity(TeslaFleetEntity): class TeslaFleetEnergyLiveEntity(TeslaFleetEntity):
"""Parent class for TeslaFleet Energy Site Live entities.""" """Parent class for TeslaFleet Energy Site Live entities."""

View File

@ -64,7 +64,7 @@ class TeslaFleetMediaEntity(TeslaFleetVehicleEntity, MediaPlayerEntity):
"""Initialize the media player entity.""" """Initialize the media player entity."""
super().__init__(data, "media") super().__init__(data, "media")
self.scoped = scoped self.scoped = scoped
if not scoped and data.signing: if not scoped:
self._attr_supported_features = MediaPlayerEntityFeature(0) self._attr_supported_features = MediaPlayerEntityFeature(0)
def _async_update_attrs(self) -> None: def _async_update_attrs(self) -> None:

View File

@ -504,9 +504,6 @@
"command_no_reason": { "command_no_reason": {
"message": "Command was unsuccessful but did not return a reason why." "message": "Command was unsuccessful but did not return a reason why."
}, },
"command_signing": {
"message": "Vehicle requires command signing. Please see documentation for more details."
},
"invalid_cop_temp": { "invalid_cop_temp": {
"message": "Cabin overheat protection does not support that temperature." "message": "Cabin overheat protection does not support that temperature."
}, },

View File

@ -167,3 +167,13 @@ def mock_request():
return_value=COMMAND_OK, return_value=COMMAND_OK,
) as mock_request: ) as mock_request:
yield mock_request yield mock_request
@pytest.fixture(autouse=True)
def mock_signed_command() -> Generator[AsyncMock]:
"""Mock Tesla Fleet Api signed_command method."""
with patch(
"homeassistant.components.tesla_fleet.VehicleSigned.signed_command",
return_value=COMMAND_OK,
) as mock_signed_command:
yield mock_signed_command

View File

@ -105,7 +105,7 @@
'original_name': 'Media player', 'original_name': 'Media player',
'platform': 'tesla_fleet', 'platform': 'tesla_fleet',
'previous_unique_id': None, 'previous_unique_id': None,
'supported_features': <MediaPlayerEntityFeature: 16437>, 'supported_features': 0,
'translation_key': 'media', 'translation_key': 'media',
'unique_id': 'LRWXF7EK4KC700000-media', 'unique_id': 'LRWXF7EK4KC700000-media',
'unit_of_measurement': None, 'unit_of_measurement': None,
@ -123,7 +123,7 @@
'media_position': 1.0, 'media_position': 1.0,
'media_title': 'Chapter 51: Cybertruck: Tesla, 20182019', 'media_title': 'Chapter 51: Cybertruck: Tesla, 20182019',
'source': 'Audible', 'source': 'Audible',
'supported_features': <MediaPlayerEntityFeature: 16437>, 'supported_features': <MediaPlayerEntityFeature: 0>,
'volume_level': 0.16129355359011466, 'volume_level': 0.16129355359011466,
}), }),
'context': <ANY>, 'context': <ANY>,

View File

@ -1,13 +1,16 @@
"""Test the Tesla Fleet button platform.""" """Test the Tesla Fleet button platform."""
from unittest.mock import patch from copy import deepcopy
from unittest.mock import AsyncMock, patch
import pytest import pytest
from syrupy import SnapshotAssertion from syrupy import SnapshotAssertion
from tesla_fleet_api.exceptions import NotOnWhitelistFault
from homeassistant.components.button import DOMAIN as BUTTON_DOMAIN, SERVICE_PRESS from homeassistant.components.button import DOMAIN as BUTTON_DOMAIN, SERVICE_PRESS
from homeassistant.const import ATTR_ENTITY_ID, Platform from homeassistant.const import ATTR_ENTITY_ID, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er from homeassistant.helpers import entity_registry as er
from . import assert_entities, setup_platform from . import assert_entities, setup_platform
@ -63,3 +66,30 @@ async def test_press(
blocking=True, blocking=True,
) )
command.assert_called_once() command.assert_called_once()
async def test_press_signing_error(
hass: HomeAssistant, normal_config_entry: MockConfigEntry, mock_products: AsyncMock
) -> None:
"""Test pressing a button with a signing error."""
# Enable Signing
new_product = deepcopy(mock_products.return_value)
new_product["response"][0]["command_signing"] = "required"
mock_products.return_value = new_product
await setup_platform(hass, normal_config_entry, [Platform.BUTTON])
with (
patch(
"homeassistant.components.tesla_fleet.VehicleSigned.flash_lights",
side_effect=NotOnWhitelistFault,
),
pytest.raises(HomeAssistantError) as error,
):
await hass.services.async_call(
BUTTON_DOMAIN,
SERVICE_PRESS,
{ATTR_ENTITY_ID: ["button.test_flash_lights"]},
blocking=True,
)
assert error.from_exception(NotOnWhitelistFault)

View File

@ -1,5 +1,6 @@
"""Test the Tesla Fleet init.""" """Test the Tesla Fleet init."""
from copy import deepcopy
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
from aiohttp import RequestInfo from aiohttp import RequestInfo
@ -404,3 +405,22 @@ async def test_init_region_issue_failed(
await setup_platform(hass, normal_config_entry) await setup_platform(hass, normal_config_entry)
mock_find_server.assert_called_once() mock_find_server.assert_called_once()
assert normal_config_entry.state is ConfigEntryState.SETUP_ERROR assert normal_config_entry.state is ConfigEntryState.SETUP_ERROR
async def test_signing(
hass: HomeAssistant,
normal_config_entry: MockConfigEntry,
mock_products: AsyncMock,
) -> None:
"""Tests when a vehicle requires signing."""
# Make the vehicle require command signing
products = deepcopy(mock_products.return_value)
products["response"][0]["command_signing"] = "required"
mock_products.return_value = products
with patch(
"homeassistant.components.tesla_fleet.TeslaFleetApi.get_private_key"
) as mock_get_private_key:
await setup_platform(hass, normal_config_entry)
mock_get_private_key.assert_called_once()

View File

@ -1,6 +1,5 @@
"""Test the tesla_fleet switch platform.""" """Test the tesla_fleet switch platform."""
from copy import deepcopy
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
import pytest import pytest
@ -166,29 +165,3 @@ async def test_switch_no_scope(
{ATTR_ENTITY_ID: "switch.test_auto_steering_wheel_heater"}, {ATTR_ENTITY_ID: "switch.test_auto_steering_wheel_heater"},
blocking=True, blocking=True,
) )
async def test_switch_no_signing(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
normal_config_entry: MockConfigEntry,
mock_products: AsyncMock,
) -> None:
"""Tests that the switch entities are correct."""
# Make the vehicle require command signing
products = deepcopy(mock_products.return_value)
products["response"][0]["command_signing"] = "required"
mock_products.return_value = products
await setup_platform(hass, normal_config_entry, [Platform.SWITCH])
with pytest.raises(
ServiceValidationError,
match="Vehicle requires command signing. Please see documentation for more details",
):
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.test_auto_steering_wheel_heater"},
blocking=True,
)