mirror of
https://github.com/home-assistant/core.git
synced 2025-07-14 08:47:10 +00:00
Use data update coordinator in NextBus to reduce api calls (#100602)
This commit is contained in:
parent
6ce6952a06
commit
e652d37f29
@ -4,15 +4,41 @@ from homeassistant.config_entries import ConfigEntry
|
|||||||
from homeassistant.const import Platform
|
from homeassistant.const import Platform
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import HomeAssistant
|
||||||
|
|
||||||
|
from .const import CONF_AGENCY, CONF_ROUTE, CONF_STOP, DOMAIN
|
||||||
|
from .coordinator import NextBusDataUpdateCoordinator
|
||||||
|
|
||||||
PLATFORMS = [Platform.SENSOR]
|
PLATFORMS = [Platform.SENSOR]
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Set up platforms for NextBus."""
|
"""Set up platforms for NextBus."""
|
||||||
|
entry_agency = entry.data[CONF_AGENCY]
|
||||||
|
|
||||||
|
coordinator: NextBusDataUpdateCoordinator = hass.data.setdefault(DOMAIN, {}).get(
|
||||||
|
entry_agency
|
||||||
|
)
|
||||||
|
if coordinator is None:
|
||||||
|
coordinator = NextBusDataUpdateCoordinator(hass, entry_agency)
|
||||||
|
hass.data[DOMAIN][entry_agency] = coordinator
|
||||||
|
|
||||||
|
coordinator.add_stop_route(entry.data[CONF_STOP], entry.data[CONF_ROUTE])
|
||||||
|
|
||||||
|
await coordinator.async_config_entry_first_refresh()
|
||||||
|
|
||||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||||
"""Unload a config entry."""
|
"""Unload a config entry."""
|
||||||
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
if await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
|
||||||
|
entry_agency = entry.data.get(CONF_AGENCY)
|
||||||
|
coordinator: NextBusDataUpdateCoordinator = hass.data[DOMAIN][entry_agency]
|
||||||
|
coordinator.remove_stop_route(entry.data[CONF_STOP], entry.data[CONF_ROUTE])
|
||||||
|
if not coordinator.has_routes():
|
||||||
|
hass.data[DOMAIN].pop(entry_agency)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
78
homeassistant/components/nextbus/coordinator.py
Normal file
78
homeassistant/components/nextbus/coordinator.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
"""NextBus data update coordinator."""
|
||||||
|
from datetime import timedelta
|
||||||
|
import logging
|
||||||
|
from typing import Any, cast
|
||||||
|
|
||||||
|
from py_nextbus import NextBusClient
|
||||||
|
from py_nextbus.client import NextBusFormatError, NextBusHTTPError, RouteStop
|
||||||
|
|
||||||
|
from homeassistant.core import HomeAssistant
|
||||||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||||
|
|
||||||
|
from .const import DOMAIN
|
||||||
|
from .util import listify
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NextBusDataUpdateCoordinator(DataUpdateCoordinator):
|
||||||
|
"""Class to manage fetching NextBus data."""
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant, agency: str) -> None:
|
||||||
|
"""Initialize a global coordinator for fetching data for a given agency."""
|
||||||
|
super().__init__(
|
||||||
|
hass,
|
||||||
|
_LOGGER,
|
||||||
|
name=DOMAIN,
|
||||||
|
update_interval=timedelta(seconds=30),
|
||||||
|
)
|
||||||
|
self.client = NextBusClient(output_format="json", agency=agency)
|
||||||
|
self._agency = agency
|
||||||
|
self._stop_routes: set[RouteStop] = set()
|
||||||
|
self._predictions: dict[RouteStop, dict[str, Any]] = {}
|
||||||
|
|
||||||
|
def add_stop_route(self, stop_tag: str, route_tag: str) -> None:
|
||||||
|
"""Tell coordinator to start tracking a given stop and route."""
|
||||||
|
self._stop_routes.add(RouteStop(route_tag, stop_tag))
|
||||||
|
|
||||||
|
def remove_stop_route(self, stop_tag: str, route_tag: str) -> None:
|
||||||
|
"""Tell coordinator to stop tracking a given stop and route."""
|
||||||
|
self._stop_routes.remove(RouteStop(route_tag, stop_tag))
|
||||||
|
|
||||||
|
def get_prediction_data(
|
||||||
|
self, stop_tag: str, route_tag: str
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
"""Get prediction result for a given stop and route."""
|
||||||
|
return self._predictions.get(RouteStop(route_tag, stop_tag))
|
||||||
|
|
||||||
|
def _calc_predictions(self, data: dict[str, Any]) -> None:
|
||||||
|
self._predictions = {
|
||||||
|
RouteStop(prediction["routeTag"], prediction["stopTag"]): prediction
|
||||||
|
for prediction in listify(data.get("predictions", []))
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_attribution(self) -> str | None:
|
||||||
|
"""Get attribution from api results."""
|
||||||
|
return self.data.get("copyright")
|
||||||
|
|
||||||
|
def has_routes(self) -> bool:
|
||||||
|
"""Check if this coordinator is tracking any routes."""
|
||||||
|
return len(self._stop_routes) > 0
|
||||||
|
|
||||||
|
async def _async_update_data(self) -> dict[str, Any]:
|
||||||
|
"""Fetch data from NextBus."""
|
||||||
|
self.logger.debug("Updating data from API. Routes: %s", str(self._stop_routes))
|
||||||
|
|
||||||
|
def _update_data() -> dict:
|
||||||
|
"""Fetch data from NextBus."""
|
||||||
|
self.logger.debug("Updating data from API (executor)")
|
||||||
|
try:
|
||||||
|
data = self.client.get_predictions_for_multi_stops(self._stop_routes)
|
||||||
|
# Casting here because we expect dict and not a str due to the input format selected being JSON
|
||||||
|
data = cast(dict[str, Any], data)
|
||||||
|
self._calc_predictions(data)
|
||||||
|
return data
|
||||||
|
except (NextBusHTTPError, NextBusFormatError) as ex:
|
||||||
|
raise UpdateFailed("Failed updating nextbus data", ex) from ex
|
||||||
|
|
||||||
|
return await self.hass.async_add_executor_job(_update_data)
|
@ -6,5 +6,5 @@
|
|||||||
"documentation": "https://www.home-assistant.io/integrations/nextbus",
|
"documentation": "https://www.home-assistant.io/integrations/nextbus",
|
||||||
"iot_class": "cloud_polling",
|
"iot_class": "cloud_polling",
|
||||||
"loggers": ["py_nextbus"],
|
"loggers": ["py_nextbus"],
|
||||||
"requirements": ["py-nextbusnext==0.1.5"]
|
"requirements": ["py-nextbusnext==1.0.0"]
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,8 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
import logging
|
import logging
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
from py_nextbus import NextBusClient
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant.components.sensor import (
|
from homeassistant.components.sensor import (
|
||||||
@ -14,14 +14,16 @@ from homeassistant.components.sensor import (
|
|||||||
)
|
)
|
||||||
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
|
||||||
from homeassistant.const import CONF_NAME
|
from homeassistant.const import CONF_NAME
|
||||||
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
|
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant, callback
|
||||||
import homeassistant.helpers.config_validation as cv
|
import homeassistant.helpers.config_validation as cv
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
||||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||||
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
from homeassistant.util.dt import utc_from_timestamp
|
from homeassistant.util.dt import utc_from_timestamp
|
||||||
|
|
||||||
from .const import CONF_AGENCY, CONF_ROUTE, CONF_STOP, DOMAIN
|
from .const import CONF_AGENCY, CONF_ROUTE, CONF_STOP, DOMAIN
|
||||||
|
from .coordinator import NextBusDataUpdateCoordinator
|
||||||
from .util import listify, maybe_first
|
from .util import listify, maybe_first
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -70,23 +72,28 @@ async def async_setup_entry(
|
|||||||
async_add_entities: AddEntitiesCallback,
|
async_add_entities: AddEntitiesCallback,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Load values from configuration and initialize the platform."""
|
"""Load values from configuration and initialize the platform."""
|
||||||
client = NextBusClient(output_format="json")
|
|
||||||
|
|
||||||
_LOGGER.debug(config.data)
|
_LOGGER.debug(config.data)
|
||||||
|
entry_agency = config.data[CONF_AGENCY]
|
||||||
|
|
||||||
sensor = NextBusDepartureSensor(
|
coordinator: NextBusDataUpdateCoordinator = hass.data[DOMAIN].get(entry_agency)
|
||||||
client,
|
|
||||||
config.unique_id,
|
async_add_entities(
|
||||||
config.data[CONF_AGENCY],
|
(
|
||||||
config.data[CONF_ROUTE],
|
NextBusDepartureSensor(
|
||||||
config.data[CONF_STOP],
|
coordinator,
|
||||||
config.data.get(CONF_NAME) or config.title,
|
cast(str, config.unique_id),
|
||||||
|
config.data[CONF_AGENCY],
|
||||||
|
config.data[CONF_ROUTE],
|
||||||
|
config.data[CONF_STOP],
|
||||||
|
config.data.get(CONF_NAME) or config.title,
|
||||||
|
),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
async_add_entities((sensor,), True)
|
|
||||||
|
|
||||||
|
class NextBusDepartureSensor(
|
||||||
class NextBusDepartureSensor(SensorEntity):
|
CoordinatorEntity[NextBusDataUpdateCoordinator], SensorEntity
|
||||||
|
):
|
||||||
"""Sensor class that displays upcoming NextBus times.
|
"""Sensor class that displays upcoming NextBus times.
|
||||||
|
|
||||||
To function, this requires knowing the agency tag as well as the tags for
|
To function, this requires knowing the agency tag as well as the tags for
|
||||||
@ -100,49 +107,57 @@ class NextBusDepartureSensor(SensorEntity):
|
|||||||
_attr_device_class = SensorDeviceClass.TIMESTAMP
|
_attr_device_class = SensorDeviceClass.TIMESTAMP
|
||||||
_attr_icon = "mdi:bus"
|
_attr_icon = "mdi:bus"
|
||||||
|
|
||||||
def __init__(self, client, unique_id, agency, route, stop, name):
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: NextBusDataUpdateCoordinator,
|
||||||
|
unique_id: str,
|
||||||
|
agency: str,
|
||||||
|
route: str,
|
||||||
|
stop: str,
|
||||||
|
name: str,
|
||||||
|
) -> None:
|
||||||
"""Initialize sensor with all required config."""
|
"""Initialize sensor with all required config."""
|
||||||
|
super().__init__(coordinator)
|
||||||
self.agency = agency
|
self.agency = agency
|
||||||
self.route = route
|
self.route = route
|
||||||
self.stop = stop
|
self.stop = stop
|
||||||
self._attr_extra_state_attributes = {}
|
self._attr_extra_state_attributes: dict[str, str] = {}
|
||||||
self._attr_unique_id = unique_id
|
self._attr_unique_id = unique_id
|
||||||
self._attr_name = name
|
self._attr_name = name
|
||||||
|
|
||||||
self._client = client
|
|
||||||
|
|
||||||
def _log_debug(self, message, *args):
|
def _log_debug(self, message, *args):
|
||||||
"""Log debug message with prefix."""
|
"""Log debug message with prefix."""
|
||||||
_LOGGER.debug(":".join((self.agency, self.route, self.stop, message)), *args)
|
_LOGGER.debug(":".join((self.agency, self.route, self.stop, message)), *args)
|
||||||
|
|
||||||
def update(self) -> None:
|
def _log_err(self, message, *args):
|
||||||
|
"""Log error message with prefix."""
|
||||||
|
_LOGGER.error(":".join((self.agency, self.route, self.stop, message)), *args)
|
||||||
|
|
||||||
|
async def async_added_to_hass(self) -> None:
|
||||||
|
"""Read data from coordinator after adding to hass."""
|
||||||
|
self._handle_coordinator_update()
|
||||||
|
await super().async_added_to_hass()
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _handle_coordinator_update(self) -> None:
|
||||||
"""Update sensor with new departures times."""
|
"""Update sensor with new departures times."""
|
||||||
# Note: using Multi because there is a bug with the single stop impl
|
results = self.coordinator.get_prediction_data(self.stop, self.route)
|
||||||
results = self._client.get_predictions_for_multi_stops(
|
self._attr_attribution = self.coordinator.get_attribution()
|
||||||
[{"stop_tag": self.stop, "route_tag": self.route}], self.agency
|
|
||||||
)
|
|
||||||
|
|
||||||
self._log_debug("Predictions results: %s", results)
|
self._log_debug("Predictions results: %s", results)
|
||||||
self._attr_attribution = results.get("copyright")
|
|
||||||
|
|
||||||
if "Error" in results:
|
if not results or "Error" in results:
|
||||||
self._log_debug("Could not get predictions: %s", results)
|
self._log_err("Error getting predictions: %s", str(results))
|
||||||
|
|
||||||
if not results.get("predictions"):
|
|
||||||
self._log_debug("No predictions available")
|
|
||||||
self._attr_native_value = None
|
self._attr_native_value = None
|
||||||
# Remove attributes that may now be outdated
|
|
||||||
self._attr_extra_state_attributes.pop("upcoming", None)
|
self._attr_extra_state_attributes.pop("upcoming", None)
|
||||||
return
|
return
|
||||||
|
|
||||||
results = results["predictions"]
|
|
||||||
|
|
||||||
# Set detailed attributes
|
# Set detailed attributes
|
||||||
self._attr_extra_state_attributes.update(
|
self._attr_extra_state_attributes.update(
|
||||||
{
|
{
|
||||||
"agency": results.get("agencyTitle"),
|
"agency": str(results.get("agencyTitle")),
|
||||||
"route": results.get("routeTitle"),
|
"route": str(results.get("routeTitle")),
|
||||||
"stop": results.get("stopTitle"),
|
"stop": str(results.get("stopTitle")),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -171,14 +186,15 @@ class NextBusDepartureSensor(SensorEntity):
|
|||||||
self._log_debug("No upcoming predictions available")
|
self._log_debug("No upcoming predictions available")
|
||||||
self._attr_native_value = None
|
self._attr_native_value = None
|
||||||
self._attr_extra_state_attributes["upcoming"] = "No upcoming predictions"
|
self._attr_extra_state_attributes["upcoming"] = "No upcoming predictions"
|
||||||
return
|
else:
|
||||||
|
# Generate list of upcoming times
|
||||||
|
self._attr_extra_state_attributes["upcoming"] = ", ".join(
|
||||||
|
sorted((p["minutes"] for p in predictions), key=int)
|
||||||
|
)
|
||||||
|
|
||||||
# Generate list of upcoming times
|
latest_prediction = maybe_first(predictions)
|
||||||
self._attr_extra_state_attributes["upcoming"] = ", ".join(
|
self._attr_native_value = utc_from_timestamp(
|
||||||
sorted((p["minutes"] for p in predictions), key=int)
|
int(latest_prediction["epochTime"]) / 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
latest_prediction = maybe_first(predictions)
|
self.async_write_ha_state()
|
||||||
self._attr_native_value = utc_from_timestamp(
|
|
||||||
int(latest_prediction["epochTime"]) / 1000
|
|
||||||
)
|
|
||||||
|
@ -1508,7 +1508,7 @@ py-dormakaba-dkey==1.0.5
|
|||||||
py-melissa-climate==2.1.4
|
py-melissa-climate==2.1.4
|
||||||
|
|
||||||
# homeassistant.components.nextbus
|
# homeassistant.components.nextbus
|
||||||
py-nextbusnext==0.1.5
|
py-nextbusnext==1.0.0
|
||||||
|
|
||||||
# homeassistant.components.nightscout
|
# homeassistant.components.nightscout
|
||||||
py-nightscout==1.2.2
|
py-nightscout==1.2.2
|
||||||
|
@ -1153,7 +1153,7 @@ py-dormakaba-dkey==1.0.5
|
|||||||
py-melissa-climate==2.1.4
|
py-melissa-climate==2.1.4
|
||||||
|
|
||||||
# homeassistant.components.nextbus
|
# homeassistant.components.nextbus
|
||||||
py-nextbusnext==0.1.5
|
py-nextbusnext==1.0.0
|
||||||
|
|
||||||
# homeassistant.components.nightscout
|
# homeassistant.components.nightscout
|
||||||
py-nightscout==1.2.2
|
py-nightscout==1.2.2
|
||||||
|
@ -2,7 +2,9 @@
|
|||||||
from collections.abc import Generator
|
from collections.abc import Generator
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
from urllib.error import HTTPError
|
||||||
|
|
||||||
|
from py_nextbus.client import NextBusFormatError, NextBusHTTPError, RouteStop
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components import sensor
|
from homeassistant.components import sensor
|
||||||
@ -12,10 +14,12 @@ from homeassistant.components.nextbus.const import (
|
|||||||
CONF_STOP,
|
CONF_STOP,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
)
|
)
|
||||||
|
from homeassistant.components.nextbus.coordinator import NextBusDataUpdateCoordinator
|
||||||
from homeassistant.config_entries import ConfigEntryState
|
from homeassistant.config_entries import ConfigEntryState
|
||||||
from homeassistant.const import CONF_NAME
|
from homeassistant.const import CONF_NAME
|
||||||
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
|
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
|
||||||
from homeassistant.helpers import issue_registry as ir
|
from homeassistant.helpers import entity_registry as er, issue_registry as ir
|
||||||
|
from homeassistant.helpers.update_coordinator import UpdateFailed
|
||||||
from homeassistant.setup import async_setup_component
|
from homeassistant.setup import async_setup_component
|
||||||
|
|
||||||
from tests.common import MockConfigEntry
|
from tests.common import MockConfigEntry
|
||||||
@ -70,9 +74,7 @@ BASIC_RESULTS = {
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_nextbus() -> Generator[MagicMock, None, None]:
|
def mock_nextbus() -> Generator[MagicMock, None, None]:
|
||||||
"""Create a mock py_nextbus module."""
|
"""Create a mock py_nextbus module."""
|
||||||
with patch(
|
with patch("homeassistant.components.nextbus.coordinator.NextBusClient") as client:
|
||||||
"homeassistant.components.nextbus.sensor.NextBusClient",
|
|
||||||
) as client:
|
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
|
||||||
@ -89,7 +91,7 @@ def mock_nextbus_predictions(
|
|||||||
|
|
||||||
async def assert_setup_sensor(
|
async def assert_setup_sensor(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
config: dict[str, str],
|
config: dict[str, dict[str, str]],
|
||||||
expected_state=ConfigEntryState.LOADED,
|
expected_state=ConfigEntryState.LOADED,
|
||||||
) -> MockConfigEntry:
|
) -> MockConfigEntry:
|
||||||
"""Set up the sensor and assert it's been created."""
|
"""Set up the sensor and assert it's been created."""
|
||||||
@ -144,9 +146,11 @@ async def test_verify_valid_state(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Verify all attributes are set from a valid response."""
|
"""Verify all attributes are set from a valid response."""
|
||||||
await assert_setup_sensor(hass, CONFIG_BASIC)
|
await assert_setup_sensor(hass, CONFIG_BASIC)
|
||||||
|
entity = er.async_get(hass).async_get(SENSOR_ID)
|
||||||
|
assert entity
|
||||||
|
|
||||||
mock_nextbus_predictions.assert_called_once_with(
|
mock_nextbus_predictions.assert_called_once_with(
|
||||||
[{"stop_tag": VALID_STOP, "route_tag": VALID_ROUTE}], VALID_AGENCY
|
{RouteStop(VALID_ROUTE, VALID_STOP)}
|
||||||
)
|
)
|
||||||
|
|
||||||
state = hass.states.get(SENSOR_ID)
|
state = hass.states.get(SENSOR_ID)
|
||||||
@ -272,6 +276,28 @@ async def test_direction_list(
|
|||||||
assert state.attributes["upcoming"] == "0, 1, 2, 3"
|
assert state.attributes["upcoming"] == "0, 1, 2, 3"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"client_exception",
|
||||||
|
(
|
||||||
|
NextBusHTTPError("failed", HTTPError("url", 500, "error", MagicMock(), None)),
|
||||||
|
NextBusFormatError("failed"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async def test_prediction_exceptions(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
mock_nextbus: MagicMock,
|
||||||
|
mock_nextbus_lists: MagicMock,
|
||||||
|
mock_nextbus_predictions: MagicMock,
|
||||||
|
client_exception: Exception,
|
||||||
|
) -> None:
|
||||||
|
"""Test that some coodinator exceptions raise UpdateFailed exceptions."""
|
||||||
|
await assert_setup_sensor(hass, CONFIG_BASIC)
|
||||||
|
coordinator: NextBusDataUpdateCoordinator = hass.data[DOMAIN][VALID_AGENCY]
|
||||||
|
mock_nextbus_predictions.side_effect = client_exception
|
||||||
|
with pytest.raises(UpdateFailed):
|
||||||
|
await coordinator._async_update_data()
|
||||||
|
|
||||||
|
|
||||||
async def test_custom_name(
|
async def test_custom_name(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
mock_nextbus: MagicMock,
|
mock_nextbus: MagicMock,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user