Add support for updating ESPHome deep sleep devices (#144161)

Co-authored-by: Keith Burzinski <kbx81x@gmail.com>
This commit is contained in:
J. Nick Koston 2025-05-03 20:50:17 -05:00 committed by GitHub
parent a15a3c12d5
commit 2a5c0d9b88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 363 additions and 23 deletions

View File

@ -195,7 +195,10 @@
"message": "Error compiling {configuration}; Try again in ESPHome dashboard for more information."
},
"error_uploading": {
"message": "Error during OTA of {configuration}; Try again in ESPHome dashboard for more information."
"message": "Error during OTA (Over-The-Air) of {configuration}; Try again in ESPHome dashboard for more information."
},
"ota_in_progress": {
"message": "An OTA (Over-The-Air) update is already in progress for {configuration}."
}
}
}

View File

@ -125,21 +125,17 @@ class ESPHomeDashboardUpdateEntity(
(dr.CONNECTION_NETWORK_MAC, entry_data.device_info.mac_address)
}
)
self._install_lock = asyncio.Lock()
self._available_future: asyncio.Future[None] | None = None
self._update_attrs()
@callback
def _update_attrs(self) -> None:
"""Update the supported features."""
# If the device has deep sleep, we can't assume we can install updates
# as the ESP will not be connectable (by design).
coordinator = self.coordinator
device_info = self._device_info
# Install support can change at run time
if (
coordinator.last_update_success
and coordinator.supports_update
and not device_info.has_deep_sleep
):
if coordinator.last_update_success and coordinator.supports_update:
self._attr_supported_features = UpdateEntityFeature.INSTALL
else:
self._attr_supported_features = NO_FEATURES
@ -178,6 +174,13 @@ class ESPHomeDashboardUpdateEntity(
self, static_info: list[EntityInfo] | None = None
) -> None:
"""Handle updated data from the device."""
if (
self._entry_data.available
and self._available_future
and not self._available_future.done()
):
self._available_future.set_result(None)
self._available_future = None
self._update_attrs()
self.async_write_ha_state()
@ -192,17 +195,46 @@ class ESPHomeDashboardUpdateEntity(
entry_data.async_subscribe_device_updated(self._handle_device_update)
)
async def async_will_remove_from_hass(self) -> None:
"""Handle entity about to be removed from Home Assistant."""
if self._available_future and not self._available_future.done():
self._available_future.cancel()
self._available_future = None
async def _async_wait_available(self) -> None:
"""Wait until the device is available."""
# If the device has deep sleep, we need to wait for it to wake up
# and connect to the network to be able to install the update.
if self._entry_data.available:
return
self._available_future = self.hass.loop.create_future()
try:
await self._available_future
finally:
self._available_future = None
async def async_install(
self, version: str | None, backup: bool, **kwargs: Any
) -> None:
"""Install an update."""
async with self.hass.data.setdefault(KEY_UPDATE_LOCK, asyncio.Lock()):
coordinator = self.coordinator
api = coordinator.api
device = coordinator.data.get(self._device_info.name)
assert device is not None
configuration = device["configuration"]
try:
if self._install_lock.locked():
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="ota_in_progress",
translation_placeholders={
"configuration": self._device_info.name,
},
)
# Ensure only one OTA per device at a time
async with self._install_lock:
# Ensure only one compile at a time for ALL devices
async with self.hass.data.setdefault(KEY_UPDATE_LOCK, asyncio.Lock()):
coordinator = self.coordinator
api = coordinator.api
device = coordinator.data.get(self._device_info.name)
assert device is not None
configuration = device["configuration"]
if not await api.compile(configuration):
raise HomeAssistantError(
translation_domain=DOMAIN,
@ -211,14 +243,25 @@ class ESPHomeDashboardUpdateEntity(
"configuration": configuration,
},
)
if not await api.upload(configuration, "OTA"):
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="error_uploading",
translation_placeholders={
"configuration": configuration,
},
)
# If the device uses deep sleep, there's a small chance it goes
# to sleep right after the dashboard connects but before the OTA
# starts. In that case, the update won't go through, so we try
# again to catch it on its next wakeup.
attempts = 2 if self._device_info.has_deep_sleep else 1
try:
for attempt in range(1, attempts + 1):
await self._async_wait_available()
if await api.upload(configuration, "OTA"):
break
if attempt == attempts:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="error_uploading",
translation_placeholders={
"configuration": configuration,
},
)
finally:
await self.coordinator.async_request_refresh()

View File

@ -1,5 +1,6 @@
"""Test ESPHome update entities."""
import asyncio
from typing import Any
from unittest.mock import patch
@ -546,3 +547,296 @@ async def test_generic_device_update_entity_has_update(
)
mock_client.update_command.assert_called_with(key=1, command=UpdateCommand.CHECK)
async def test_attempt_to_update_twice(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
mock_dashboard: dict[str, Any],
) -> None:
"""Test attempting to update twice."""
mock_dashboard["configured"] = [
{
"name": "test",
"current_version": "2023.2.0-dev",
"configuration": "test.yaml",
}
]
await async_get_dashboard(hass).async_refresh()
await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
)
await hass.async_block_till_done()
state = hass.states.get("update.test_firmware")
assert state is not None
async def delayed_compile(*args: Any, **kwargs: Any) -> None:
"""Delay the update."""
await asyncio.sleep(0)
return True
# Compile success, upload fails
with (
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.compile",
delayed_compile,
),
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.upload",
return_value=False,
),
):
update_task = hass.async_create_task(
hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
)
with pytest.raises(HomeAssistantError, match="update is already in progress"):
await hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
with pytest.raises(HomeAssistantError, match="OTA"):
await update_task
async def test_update_deep_sleep_already_online(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
mock_dashboard: dict[str, Any],
) -> None:
"""Test attempting to update twice."""
mock_dashboard["configured"] = [
{
"name": "test",
"current_version": "2023.2.0-dev",
"configuration": "test.yaml",
}
]
await async_get_dashboard(hass).async_refresh()
await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
device_info={"has_deep_sleep": True},
)
await hass.async_block_till_done()
state = hass.states.get("update.test_firmware")
assert state is not None
# Compile success, upload success
with (
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.compile",
return_value=True,
),
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.upload",
return_value=True,
),
):
await hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
async def test_update_deep_sleep_offline(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
mock_dashboard: dict[str, Any],
) -> None:
"""Test device comes online while updating."""
mock_dashboard["configured"] = [
{
"name": "test",
"current_version": "2023.2.0-dev",
"configuration": "test.yaml",
}
]
await async_get_dashboard(hass).async_refresh()
device = await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
device_info={"has_deep_sleep": True},
)
await hass.async_block_till_done()
state = hass.states.get("update.test_firmware")
assert state is not None
await device.mock_disconnect(True)
# Compile success, upload success
with (
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.compile",
return_value=True,
),
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.upload",
return_value=True,
),
):
update_task = hass.async_create_task(
hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
)
await asyncio.sleep(0)
assert not update_task.done()
await device.mock_connect()
await hass.async_block_till_done()
async def test_update_deep_sleep_offline_sleep_during_ota(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
mock_dashboard: dict[str, Any],
) -> None:
"""Test device goes to sleep right as we start the OTA."""
mock_dashboard["configured"] = [
{
"name": "test",
"current_version": "2023.2.0-dev",
"configuration": "test.yaml",
}
]
await async_get_dashboard(hass).async_refresh()
device = await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
device_info={"has_deep_sleep": True},
)
await hass.async_block_till_done()
state = hass.states.get("update.test_firmware")
assert state is not None
await device.mock_disconnect(True)
upload_attempt = 0
upload_attempt_2_future = hass.loop.create_future()
disconnect_future = hass.loop.create_future()
async def upload_takes_a_while(*args: Any, **kwargs: Any) -> None:
"""Delay the update."""
nonlocal upload_attempt
upload_attempt += 1
if upload_attempt == 1:
# We are simulating the device going back to sleep
# before the upload can be started
# Wait for the device to go unavailable
# before returning false
await disconnect_future
return False
upload_attempt_2_future.set_result(None)
return True
# Compile success, upload fails first time, success second time
with (
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.compile",
return_value=True,
),
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.upload",
upload_takes_a_while,
),
):
update_task = hass.async_create_task(
hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
)
await asyncio.sleep(0)
assert not update_task.done()
await device.mock_connect()
# Mock device being at the end of its sleep cycle
# and going to sleep right as the upload starts
# This can happen because there is non zero time
# between when we tell the dashboard to upload and
# when the upload actually starts
await device.mock_disconnect(True)
disconnect_future.set_result(None)
assert not upload_attempt_2_future.done()
# Now the device wakes up and the upload is attempted
await device.mock_connect()
await upload_attempt_2_future
await hass.async_block_till_done()
async def test_update_deep_sleep_offline_cancelled_unload(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
mock_dashboard: dict[str, Any],
) -> None:
"""Test deep sleep update attempt is cancelled on unload."""
mock_dashboard["configured"] = [
{
"name": "test",
"current_version": "2023.2.0-dev",
"configuration": "test.yaml",
}
]
await async_get_dashboard(hass).async_refresh()
device = await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
device_info={"has_deep_sleep": True},
)
await hass.async_block_till_done()
state = hass.states.get("update.test_firmware")
assert state is not None
await device.mock_disconnect(True)
# Compile success, upload success, but we cancel the update
with (
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.compile",
return_value=True,
),
patch(
"homeassistant.components.esphome.coordinator.ESPHomeDashboardAPI.upload",
return_value=True,
),
):
update_task = hass.async_create_task(
hass.services.async_call(
UPDATE_DOMAIN,
SERVICE_INSTALL,
{ATTR_ENTITY_ID: "update.test_firmware"},
blocking=True,
)
)
await asyncio.sleep(0)
assert not update_task.done()
await hass.config_entries.async_unload(device.entry.entry_id)
await hass.async_block_till_done()
assert update_task.cancelled()