Simplify teslemetry service actions

This commit is contained in:
epenet 2025-06-13 14:52:25 +00:00
parent d880ce6bb4
commit f99a1cc081

View File

@ -98,15 +98,11 @@ def async_get_energy_site_for_entry(
return energy_data
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up the Teslemetry services."""
async def navigate_gps_request(call: ServiceCall) -> None:
async def _navigate_gps_request(call: ServiceCall) -> None:
"""Send lat,lon,order with a vehicle."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
vehicle = async_get_vehicle_for_entry(hass, device, config)
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
vehicle = async_get_vehicle_for_entry(call.hass, device, config)
await handle_vehicle_command(
vehicle.api.navigation_gps_request(
@ -116,27 +112,12 @@ def async_setup_services(hass: HomeAssistant) -> None:
)
)
hass.services.async_register(
DOMAIN,
SERVICE_NAVIGATE_ATTR_GPS_REQUEST,
navigate_gps_request,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(ATTR_GPS): {
vol.Required(CONF_LATITUDE): cv.latitude,
vol.Required(CONF_LONGITUDE): cv.longitude,
},
vol.Optional(ATTR_ORDER): cv.positive_int,
}
),
)
async def set_scheduled_charging(call: ServiceCall) -> None:
async def _set_scheduled_charging(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
vehicle = async_get_vehicle_for_entry(hass, device, config)
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
vehicle = async_get_vehicle_for_entry(call.hass, device, config)
time: int | None = None
# Convert time to minutes since minute
@ -152,32 +133,18 @@ def async_setup_services(hass: HomeAssistant) -> None:
vehicle.api.set_scheduled_charging(enable=call.data["enable"], time=time)
)
hass.services.async_register(
DOMAIN,
SERVICE_SET_SCHEDULED_CHARGING,
set_scheduled_charging,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(ATTR_ENABLE): bool,
vol.Optional(ATTR_TIME): str,
}
),
)
async def set_scheduled_departure(call: ServiceCall) -> None:
async def _set_scheduled_departure(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
vehicle = async_get_vehicle_for_entry(hass, device, config)
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
vehicle = async_get_vehicle_for_entry(call.hass, device, config)
enable = call.data.get("enable", True)
# Preconditioning
preconditioning_enabled = call.data.get(ATTR_PRECONDITIONING_ENABLED, False)
preconditioning_weekdays_only = call.data.get(
ATTR_PRECONDITIONING_WEEKDAYS, False
)
preconditioning_weekdays_only = call.data.get(ATTR_PRECONDITIONING_WEEKDAYS, False)
departure_time: int | None = None
if ATTR_DEPARTURE_TIME in call.data:
(hours, minutes, *seconds) = call.data[ATTR_DEPARTURE_TIME].split(":")
@ -216,10 +183,89 @@ def async_setup_services(hass: HomeAssistant) -> None:
)
)
async def _valet_mode(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
vehicle = async_get_vehicle_for_entry(call.hass, device, config)
await handle_vehicle_command(
vehicle.api.set_valet_mode(call.data.get("enable"), call.data.get("pin", ""))
)
async def _speed_limit(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
vehicle = async_get_vehicle_for_entry(call.hass, device, config)
enable = call.data.get("enable")
if enable is True:
await handle_vehicle_command(
vehicle.api.speed_limit_activate(call.data.get("pin"))
)
elif enable is False:
await handle_vehicle_command(
vehicle.api.speed_limit_deactivate(call.data.get("pin"))
)
async def _time_of_use(call: ServiceCall) -> None:
"""Configure time of use settings."""
device = async_get_device_for_service_call(call.hass, call)
config = async_get_config_for_device(call.hass, device)
site = async_get_energy_site_for_entry(call.hass, device, config)
resp = await handle_command(
site.api.time_of_use_settings(call.data.get(ATTR_TOU_SETTINGS))
)
if "error" in resp:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="command_error",
translation_placeholders={"error": resp["error"]},
)
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up the Teslemetry services."""
hass.services.async_register(
DOMAIN,
SERVICE_NAVIGATE_ATTR_GPS_REQUEST,
_navigate_gps_request,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(ATTR_GPS): {
vol.Required(CONF_LATITUDE): cv.latitude,
vol.Required(CONF_LONGITUDE): cv.longitude,
},
vol.Optional(ATTR_ORDER): cv.positive_int,
}
),
)
hass.services.async_register(
DOMAIN,
SERVICE_SET_SCHEDULED_CHARGING,
_set_scheduled_charging,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(ATTR_ENABLE): bool,
vol.Optional(ATTR_TIME): str,
}
),
)
hass.services.async_register(
DOMAIN,
SERVICE_SET_SCHEDULED_DEPARTURE,
set_scheduled_departure,
_set_scheduled_departure,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
@ -234,22 +280,10 @@ def async_setup_services(hass: HomeAssistant) -> None:
),
)
async def valet_mode(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
vehicle = async_get_vehicle_for_entry(hass, device, config)
await handle_vehicle_command(
vehicle.api.set_valet_mode(
call.data.get("enable"), call.data.get("pin", "")
)
)
hass.services.async_register(
DOMAIN,
SERVICE_VALET_MODE,
valet_mode,
_valet_mode,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
@ -259,26 +293,10 @@ def async_setup_services(hass: HomeAssistant) -> None:
),
)
async def speed_limit(call: ServiceCall) -> None:
"""Configure fleet telemetry."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
vehicle = async_get_vehicle_for_entry(hass, device, config)
enable = call.data.get("enable")
if enable is True:
await handle_vehicle_command(
vehicle.api.speed_limit_activate(call.data.get("pin"))
)
elif enable is False:
await handle_vehicle_command(
vehicle.api.speed_limit_deactivate(call.data.get("pin"))
)
hass.services.async_register(
DOMAIN,
SERVICE_SPEED_LIMIT,
speed_limit,
_speed_limit,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,
@ -288,26 +306,10 @@ def async_setup_services(hass: HomeAssistant) -> None:
),
)
async def time_of_use(call: ServiceCall) -> None:
"""Configure time of use settings."""
device = async_get_device_for_service_call(hass, call)
config = async_get_config_for_device(hass, device)
site = async_get_energy_site_for_entry(hass, device, config)
resp = await handle_command(
site.api.time_of_use_settings(call.data.get(ATTR_TOU_SETTINGS))
)
if "error" in resp:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="command_error",
translation_placeholders={"error": resp["error"]},
)
hass.services.async_register(
DOMAIN,
SERVICE_TIME_OF_USE,
time_of_use,
_time_of_use,
schema=vol.Schema(
{
vol.Required(CONF_DEVICE_ID): cv.string,