Rewrite APCUPSD sensors using DataUpdateCoordinator (#88467)

* Add test sensor.

* Fix sensor test file name.

* Add binary sensor test.

* Fix comments and styling.

* Remove apcupsd from omissions in coveragerc.

* Revert "Remove apcupsd from omissions in coveragerc."

This reverts commit 66b05fcb8829619a771a650a3d70174089e15d91.

* Implement the data coordinator for apcupsd.

* Add tests for sensor updates and throttles.

* Reorder the statement for better code clarity.

* Update docstring.

* Add more tests for checking if the coordinator works ok.

* Implement a custom debouncer with 5 second cooldown for the coordinator.

* Add more tests for checking if our integration is able to properly mark entity's availability.

* Make apcupsd a silver integration.

* Try to fix non-deterministic test behaviors

* Fix JSON format

* Use new `with` format in python 3.10 for better readability

* Update tests.

* Rebase and simplify code.

* Add an ups prefix to the property methods of the coordinator

* Replace init_integration with async_init_integration

* Lint fixes

* Fix imports

* Update BinarySensor implementation to add initial update of attributes

* Fix test failures due to rebases

* Reorder the statements for better code clarity

* Fix incorrect references to the ups_name property

* Simplify BinarySensor value getter code

* No need to update when adding coordinator-controlled sensors
This commit is contained in:
Yuxin Wang 2023-11-21 16:40:05 -05:00 committed by GitHub
parent f45d373e17
commit 33c5d1855d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 296 additions and 142 deletions

View File

@ -67,9 +67,6 @@ omit =
homeassistant/components/android_ip_webcam/switch.py
homeassistant/components/anel_pwrctrl/switch.py
homeassistant/components/anthemav/media_player.py
homeassistant/components/apcupsd/__init__.py
homeassistant/components/apcupsd/binary_sensor.py
homeassistant/components/apcupsd/sensor.py
homeassistant/components/apple_tv/__init__.py
homeassistant/components/apple_tv/browse_media.py
homeassistant/components/apple_tv/media_player.py

View File

@ -1,44 +1,46 @@
"""Support for APCUPSd via its Network Information Server (NIS)."""
from __future__ import annotations
from collections import OrderedDict
from datetime import timedelta
import logging
from typing import Any, Final
from typing import Final
from apcaccess import status
import async_timeout
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PORT, Platform
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.util import Throttle
from homeassistant.helpers.update_coordinator import (
REQUEST_REFRESH_DEFAULT_IMMEDIATE,
DataUpdateCoordinator,
UpdateFailed,
)
_LOGGER = logging.getLogger(__name__)
DOMAIN: Final = "apcupsd"
VALUE_ONLINE: Final = 8
PLATFORMS: Final = (Platform.BINARY_SENSOR, Platform.SENSOR)
MIN_TIME_BETWEEN_UPDATES: Final = timedelta(seconds=60)
UPDATE_INTERVAL: Final = timedelta(seconds=60)
REQUEST_REFRESH_COOLDOWN: Final = 5
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Use config values to set up a function enabling status retrieval."""
data_service = APCUPSdData(
config_entry.data[CONF_HOST], config_entry.data[CONF_PORT]
)
host, port = config_entry.data[CONF_HOST], config_entry.data[CONF_PORT]
coordinator = APCUPSdCoordinator(hass, host, port)
try:
await hass.async_add_executor_job(data_service.update)
except OSError as ex:
_LOGGER.error("Failure while testing APCUPSd status retrieval: %s", ex)
return False
await coordinator.async_config_entry_first_refresh()
# Store the data service object.
# Store the coordinator for later uses.
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][config_entry.entry_id] = data_service
hass.data[DOMAIN][config_entry.entry_id] = coordinator
# Forward the config entries to the supported platforms.
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
@ -53,64 +55,78 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok
class APCUPSdData:
"""Stores the data retrieved from APCUPSd.
class APCUPSdCoordinator(DataUpdateCoordinator[OrderedDict[str, str]]):
"""Store and coordinate the data retrieved from APCUPSd for all sensors.
For each entity to use, acts as the single point responsible for fetching
updates from the server.
"""
def __init__(self, host: str, port: int) -> None:
def __init__(self, hass: HomeAssistant, host: str, port: int) -> None:
"""Initialize the data object."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=UPDATE_INTERVAL,
request_refresh_debouncer=Debouncer(
hass,
_LOGGER,
cooldown=REQUEST_REFRESH_COOLDOWN,
immediate=REQUEST_REFRESH_DEFAULT_IMMEDIATE,
),
)
self._host = host
self._port = port
self.status: dict[str, str] = {}
@property
def name(self) -> str | None:
def ups_name(self) -> str | None:
"""Return the name of the UPS, if available."""
return self.status.get("UPSNAME")
return self.data.get("UPSNAME")
@property
def model(self) -> str | None:
def ups_model(self) -> str | None:
"""Return the model of the UPS, if available."""
# Different UPS models may report slightly different keys for model, here we
# try them all.
for model_key in ("APCMODEL", "MODEL"):
if model_key in self.status:
return self.status[model_key]
if model_key in self.data:
return self.data[model_key]
return None
@property
def serial_no(self) -> str | None:
def ups_serial_no(self) -> str | None:
"""Return the unique serial number of the UPS, if available."""
return self.status.get("SERIALNO")
@property
def statflag(self) -> str | None:
"""Return the STATFLAG indicating the status of the UPS, if available."""
return self.status.get("STATFLAG")
return self.data.get("SERIALNO")
@property
def device_info(self) -> DeviceInfo | None:
"""Return the DeviceInfo of this APC UPS for the sensors, if serial number is available."""
if self.serial_no is None:
"""Return the DeviceInfo of this APC UPS, if serial number is available."""
if not self.ups_serial_no:
return None
return DeviceInfo(
identifiers={(DOMAIN, self.serial_no)},
model=self.model,
identifiers={(DOMAIN, self.ups_serial_no)},
model=self.ups_model,
manufacturer="APC",
name=self.name if self.name is not None else "APC UPS",
hw_version=self.status.get("FIRMWARE"),
sw_version=self.status.get("VERSION"),
name=self.ups_name if self.ups_name else "APC UPS",
hw_version=self.data.get("FIRMWARE"),
sw_version=self.data.get("VERSION"),
)
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self, **kwargs: Any) -> None:
async def _async_update_data(self) -> OrderedDict[str, str]:
"""Fetch the latest status from APCUPSd.
Note that the result dict uses upper case for each resource, where our
integration uses lower cases as keys internally.
"""
self.status = status.parse(status.get(host=self._host, port=self._port))
async with async_timeout.timeout(10):
try:
raw = await self.hass.async_add_executor_job(
status.get, self._host, self._port
)
result: OrderedDict[str, str] = status.parse(raw)
return result
except OSError as error:
raise UpdateFailed(error) from error

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import logging
from typing import Final
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
@ -10,8 +11,9 @@ from homeassistant.components.binary_sensor import (
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import DOMAIN, VALUE_ONLINE, APCUPSdData
from . import DOMAIN, APCUPSdCoordinator
_LOGGER = logging.getLogger(__name__)
_DESCRIPTION = BinarySensorEntityDescription(
@ -19,6 +21,8 @@ _DESCRIPTION = BinarySensorEntityDescription(
name="UPS Online Status",
icon="mdi:heart",
)
# The bit in STATFLAG that indicates the online status of the APC UPS.
_VALUE_ONLINE_MASK: Final = 0b1000
async def async_setup_entry(
@ -27,50 +31,36 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up an APCUPSd Online Status binary sensor."""
data_service: APCUPSdData = hass.data[DOMAIN][config_entry.entry_id]
coordinator: APCUPSdCoordinator = hass.data[DOMAIN][config_entry.entry_id]
# Do not create the binary sensor if APCUPSd does not provide STATFLAG field for us
# to determine the online status.
if data_service.statflag is None:
if _DESCRIPTION.key.upper() not in coordinator.data:
return
async_add_entities(
[OnlineStatus(data_service, _DESCRIPTION)],
update_before_add=True,
)
async_add_entities([OnlineStatus(coordinator, _DESCRIPTION)])
class OnlineStatus(BinarySensorEntity):
class OnlineStatus(CoordinatorEntity[APCUPSdCoordinator], BinarySensorEntity):
"""Representation of a UPS online status."""
def __init__(
self,
data_service: APCUPSdData,
coordinator: APCUPSdCoordinator,
description: BinarySensorEntityDescription,
) -> None:
"""Initialize the APCUPSd binary device."""
super().__init__(coordinator, context=description.key.upper())
# Set up unique id and device info if serial number is available.
if (serial_no := data_service.serial_no) is not None:
if (serial_no := coordinator.ups_serial_no) is not None:
self._attr_unique_id = f"{serial_no}_{description.key}"
self._attr_device_info = data_service.device_info
self.entity_description = description
self._data_service = data_service
self._attr_device_info = coordinator.device_info
def update(self) -> None:
"""Get the status report from APCUPSd and set this entity's state."""
try:
self._data_service.update()
except OSError as ex:
if self._attr_available:
self._attr_available = False
_LOGGER.exception("Got exception while fetching state: %s", ex)
return
self._attr_available = True
@property
def is_on(self) -> bool | None:
"""Returns true if the UPS is online."""
# Check if ONLINE bit is set in STATFLAG.
key = self.entity_description.key.upper()
if key not in self._data_service.status:
self._attr_is_on = None
return
self._attr_is_on = int(self._data_service.status[key], 16) & VALUE_ONLINE > 0
return int(self.coordinator.data[key], 16) & _VALUE_ONLINE_MASK != 0

View File

@ -1,6 +1,7 @@
"""Config flow for APCUPSd integration."""
from __future__ import annotations
import asyncio
from typing import Any
import voluptuous as vol
@ -10,8 +11,9 @@ from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import selector
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.update_coordinator import UpdateFailed
from . import DOMAIN, APCUPSdData
from . import DOMAIN, APCUPSdCoordinator
_PORT_SELECTOR = vol.All(
selector.NumberSelector(
@ -43,36 +45,37 @@ class ConfigFlowHandler(ConfigFlow, domain=DOMAIN):
if user_input is None:
return self.async_show_form(step_id="user", data_schema=_SCHEMA)
host, port = user_input[CONF_HOST], user_input[CONF_PORT]
# Abort if an entry with same host and port is present.
self._async_abort_entries_match(
{CONF_HOST: user_input[CONF_HOST], CONF_PORT: user_input[CONF_PORT]}
)
self._async_abort_entries_match({CONF_HOST: host, CONF_PORT: port})
# Test the connection to the host and get the current status for serial number.
data_service = APCUPSdData(user_input[CONF_HOST], user_input[CONF_PORT])
try:
await self.hass.async_add_executor_job(data_service.update)
except OSError:
coordinator = APCUPSdCoordinator(self.hass, host, port)
await coordinator.async_request_refresh()
await self.hass.async_block_till_done()
if isinstance(coordinator.last_exception, (UpdateFailed, asyncio.TimeoutError)):
errors = {"base": "cannot_connect"}
return self.async_show_form(
step_id="user", data_schema=_SCHEMA, errors=errors
)
if not data_service.status:
if not coordinator.data:
return self.async_abort(reason="no_status")
# We _try_ to use the serial number of the UPS as the unique id since this field
# is not guaranteed to exist on all APC UPS models.
await self.async_set_unique_id(data_service.serial_no)
await self.async_set_unique_id(coordinator.ups_serial_no)
self._abort_if_unique_id_configured()
title = "APC UPS"
if data_service.name is not None:
title = data_service.name
elif data_service.model is not None:
title = data_service.model
elif data_service.serial_no is not None:
title = data_service.serial_no
if coordinator.ups_name is not None:
title = coordinator.ups_name
elif coordinator.ups_model is not None:
title = coordinator.ups_model
elif coordinator.ups_serial_no is not None:
title = coordinator.ups_serial_no
return self.async_create_entry(
title=title,

View File

@ -6,5 +6,6 @@
"documentation": "https://www.home-assistant.io/integrations/apcupsd",
"iot_class": "local_polling",
"loggers": ["apcaccess"],
"quality_scale": "silver",
"requirements": ["apcaccess==0.0.13"]
}

View File

@ -20,10 +20,11 @@ from homeassistant.const import (
UnitOfTemperature,
UnitOfTime,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import DOMAIN, APCUPSdData
from . import DOMAIN, APCUPSdCoordinator
_LOGGER = logging.getLogger(__name__)
@ -452,11 +453,11 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the APCUPSd sensors from config entries."""
data_service: APCUPSdData = hass.data[DOMAIN][config_entry.entry_id]
coordinator: APCUPSdCoordinator = hass.data[DOMAIN][config_entry.entry_id]
# The resources from data service are in upper-case by default, but we use
# lower cases throughout this integration.
available_resources: set[str] = {k.lower() for k, _ in data_service.status.items()}
# The resource keys in the data dict collected in the coordinator is in upper-case
# by default, but we use lower cases throughout this integration.
available_resources: set[str] = {k.lower() for k, _ in coordinator.data.items()}
entities = []
for resource in available_resources:
@ -464,9 +465,9 @@ async def async_setup_entry(
_LOGGER.warning("Invalid resource from APCUPSd: %s", resource.upper())
continue
entities.append(APCUPSdSensor(data_service, SENSORS[resource]))
entities.append(APCUPSdSensor(coordinator, SENSORS[resource]))
async_add_entities(entities, update_before_add=True)
async_add_entities(entities)
def infer_unit(value: str) -> tuple[str, str | None]:
@ -483,41 +484,36 @@ def infer_unit(value: str) -> tuple[str, str | None]:
return value, None
class APCUPSdSensor(SensorEntity):
class APCUPSdSensor(CoordinatorEntity[APCUPSdCoordinator], SensorEntity):
"""Representation of a sensor entity for APCUPSd status values."""
def __init__(
self,
data_service: APCUPSdData,
coordinator: APCUPSdCoordinator,
description: SensorEntityDescription,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator=coordinator, context=description.key.upper())
# Set up unique id and device info if serial number is available.
if (serial_no := data_service.serial_no) is not None:
if (serial_no := coordinator.ups_serial_no) is not None:
self._attr_unique_id = f"{serial_no}_{description.key}"
self._attr_device_info = data_service.device_info
self.entity_description = description
self._data_service = data_service
self._attr_device_info = coordinator.device_info
def update(self) -> None:
"""Get the latest status and use it to update our sensor state."""
try:
self._data_service.update()
except OSError as ex:
if self._attr_available:
self._attr_available = False
_LOGGER.exception("Got exception while fetching state: %s", ex)
return
# Initial update of attributes.
self._update_attrs()
self._attr_available = True
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._update_attrs()
self.async_write_ha_state()
def _update_attrs(self) -> None:
"""Update sensor attributes based on coordinator data."""
key = self.entity_description.key.upper()
if key not in self._data_service.status:
self._attr_native_value = None
return
self._attr_native_value, inferred_unit = infer_unit(
self._data_service.status[key]
)
self._attr_native_value, inferred_unit = infer_unit(self.coordinator.data[key])
if not self.native_unit_of_measurement:
self._attr_native_unit_of_measurement = inferred_unit

View File

@ -95,8 +95,9 @@ async def async_init_integration(
entry.add_to_hass(hass)
with patch("apcaccess.status.parse", return_value=status), patch(
"apcaccess.status.get", return_value=b""
with (
patch("apcaccess.status.parse", return_value=status),
patch("apcaccess.status.get", return_value=b""),
):
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()

View File

@ -38,10 +38,10 @@ async def test_config_flow_cannot_connect(hass: HomeAssistant) -> None:
async def test_config_flow_no_status(hass: HomeAssistant) -> None:
"""Test config flow setup with successful connection but no status is reported."""
with patch(
"apcaccess.status.parse",
return_value={}, # Returns no status.
), patch("apcaccess.status.get", return_value=b""):
with (
patch("apcaccess.status.parse", return_value={}), # Returns no status.
patch("apcaccess.status.get", return_value=b""),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
@ -63,9 +63,11 @@ async def test_config_flow_duplicate(hass: HomeAssistant) -> None:
)
mock_entry.add_to_hass(hass)
with patch("apcaccess.status.parse") as mock_parse, patch(
"apcaccess.status.get", return_value=b""
), _patch_setup():
with (
patch("apcaccess.status.parse") as mock_parse,
patch("apcaccess.status.get", return_value=b""),
_patch_setup(),
):
mock_parse.return_value = MOCK_STATUS
# Now, create the integration again using the same config data, we should reject
@ -109,9 +111,11 @@ async def test_config_flow_duplicate(hass: HomeAssistant) -> None:
async def test_flow_works(hass: HomeAssistant) -> None:
"""Test successful creation of config entries via user configuration."""
with patch("apcaccess.status.parse", return_value=MOCK_STATUS), patch(
"apcaccess.status.get", return_value=b""
), _patch_setup() as mock_setup:
with (
patch("apcaccess.status.parse", return_value=MOCK_STATUS),
patch("apcaccess.status.get", return_value=b""),
_patch_setup() as mock_setup,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
@ -147,9 +151,11 @@ async def test_flow_minimal_status(
We test different combinations of minimal statuses, where the title of the
integration will vary.
"""
with patch("apcaccess.status.parse") as mock_parse, patch(
"apcaccess.status.get", return_value=b""
), _patch_setup() as mock_setup:
with (
patch("apcaccess.status.parse") as mock_parse,
patch("apcaccess.status.get", return_value=b""),
_patch_setup() as mock_setup,
):
status = MOCK_MINIMAL_STATUS | extra_status
mock_parse.return_value = status

View File

@ -4,15 +4,16 @@ from unittest.mock import patch
import pytest
from homeassistant.components.apcupsd import DOMAIN
from homeassistant.components.apcupsd import DOMAIN, UPDATE_INTERVAL
from homeassistant.config_entries import SOURCE_USER, ConfigEntryState
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.util import utcnow
from . import CONF_DATA, MOCK_MINIMAL_STATUS, MOCK_STATUS, async_init_integration
from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, async_fire_time_changed
@pytest.mark.parametrize("status", (MOCK_STATUS, MOCK_MINIMAL_STATUS))
@ -67,11 +68,11 @@ async def test_device_entry(
for field, entry_value in fields.items():
if field in status:
assert entry_value == status[field]
# Even if UPSNAME is not available, we must fall back to default "APC UPS".
elif field == "UPSNAME":
# Even if UPSNAME is not available, we must fall back to default "APC UPS".
assert entry_value == "APC UPS"
else:
assert entry_value is None
assert not entry_value
assert entry.manufacturer == "APC"
@ -107,15 +108,16 @@ async def test_connection_error(hass: HomeAssistant) -> None:
entry.add_to_hass(hass)
with patch("apcaccess.status.parse", side_effect=OSError()), patch(
"apcaccess.status.get"
with (
patch("apcaccess.status.parse", side_effect=OSError()),
patch("apcaccess.status.get"),
):
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.SETUP_ERROR
assert entry.state is ConfigEntryState.SETUP_RETRY
async def test_unload_remove(hass: HomeAssistant) -> None:
"""Test successful unload of entry."""
async def test_unload_remove_entry(hass: HomeAssistant) -> None:
"""Test successful unload and removal of an entry."""
# Load two integrations from two mock hosts.
entries = (
await async_init_integration(hass, host="test1", status=MOCK_STATUS),
@ -142,3 +144,41 @@ async def test_unload_remove(hass: HomeAssistant) -> None:
await hass.config_entries.async_remove(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.config_entries.async_entries(DOMAIN)) == 0
async def test_availability(hass: HomeAssistant) -> None:
"""Ensure that we mark the entity's availability properly when network is down / back up."""
await async_init_integration(hass)
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert pytest.approx(float(state.state)) == 14.0
with (
patch("apcaccess.status.parse") as mock_parse,
patch("apcaccess.status.get", return_value=b""),
):
# Mock a network error and then trigger an auto-polling event.
mock_parse.side_effect = OSError()
future = utcnow() + UPDATE_INTERVAL
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
# Sensors should be marked as unavailable.
state = hass.states.get("sensor.ups_load")
assert state
assert state.state == STATE_UNAVAILABLE
# Reset the API to return a new status and update.
mock_parse.side_effect = None
mock_parse.return_value = MOCK_STATUS | {"LOADPCT": "15.0 Percent"}
future = future + UPDATE_INTERVAL
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
# Sensors should be online now with the new value.
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert pytest.approx(float(state.state)) == 15.0

View File

@ -1,5 +1,9 @@
"""Test sensors of APCUPSd integration."""
from datetime import timedelta
from unittest.mock import patch
from homeassistant.components.apcupsd import REQUEST_REFRESH_COOLDOWN
from homeassistant.components.sensor import (
ATTR_STATE_CLASS,
SensorDeviceClass,
@ -7,17 +11,23 @@ from homeassistant.components.sensor import (
)
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
ATTR_UNIT_OF_MEASUREMENT,
PERCENTAGE,
STATE_UNAVAILABLE,
UnitOfElectricPotential,
UnitOfPower,
UnitOfTime,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util.dt import utcnow
from . import MOCK_STATUS, async_init_integration
from tests.common import async_fire_time_changed
async def test_sensor(hass: HomeAssistant, entity_registry: er.EntityRegistry) -> None:
"""Test states of sensor."""
@ -105,3 +115,97 @@ async def test_sensor_disabled(
assert updated_entry != entry
assert updated_entry.disabled is False
async def test_state_update(hass: HomeAssistant) -> None:
"""Ensure the sensor state changes after updating the data."""
await async_init_integration(hass)
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "14.0"
new_status = MOCK_STATUS | {"LOADPCT": "15.0 Percent"}
with (
patch("apcaccess.status.parse", return_value=new_status),
patch("apcaccess.status.get", return_value=b""),
):
future = utcnow() + timedelta(minutes=2)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "15.0"
async def test_manual_update_entity(hass: HomeAssistant) -> None:
"""Test manual update entity via service homeassistant/update_entity."""
await async_init_integration(hass)
# Assert the initial state of sensor.ups_load.
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "14.0"
# Setup HASS for calling the update_entity service.
await async_setup_component(hass, "homeassistant", {})
with (
patch("apcaccess.status.parse") as mock_parse,
patch("apcaccess.status.get", return_value=b"") as mock_get,
):
mock_parse.return_value = MOCK_STATUS | {
"LOADPCT": "15.0 Percent",
"BCHARGE": "99.0 Percent",
}
# Now, we fast-forward the time to pass the debouncer cooldown, but put it
# before the normal update interval to see if the manual update works.
future = utcnow() + timedelta(seconds=REQUEST_REFRESH_COOLDOWN)
async_fire_time_changed(hass, future)
await hass.services.async_call(
"homeassistant",
"update_entity",
{ATTR_ENTITY_ID: ["sensor.ups_load", "sensor.ups_battery"]},
blocking=True,
)
# Even if we requested updates for two entities, our integration should smartly
# group the API calls to just one.
assert mock_parse.call_count == 1
assert mock_get.call_count == 1
# The new state should be effective.
state = hass.states.get("sensor.ups_load")
assert state
assert state.state != STATE_UNAVAILABLE
assert state.state == "15.0"
async def test_multiple_manual_update_entity(hass: HomeAssistant) -> None:
"""Test multiple simultaneous manual update entity via service homeassistant/update_entity.
We should only do network call once for the multiple simultaneous update entity services.
"""
await async_init_integration(hass)
# Setup HASS for calling the update_entity service.
await async_setup_component(hass, "homeassistant", {})
with (
patch("apcaccess.status.parse", return_value=MOCK_STATUS) as mock_parse,
patch("apcaccess.status.get", return_value=b"") as mock_get,
):
# Fast-forward time to just pass the initial debouncer cooldown.
future = utcnow() + timedelta(seconds=REQUEST_REFRESH_COOLDOWN)
async_fire_time_changed(hass, future)
await hass.services.async_call(
"homeassistant",
"update_entity",
{ATTR_ENTITY_ID: ["sensor.ups_load", "sensor.ups_input_voltage"]},
blocking=True,
)
assert mock_parse.call_count == 1
assert mock_get.call_count == 1