mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 00:37:53 +00:00
Improve code quality of PECO integration (#68865)
This commit is contained in:
parent
bfd84ba89c
commit
5cd532b16a
@ -1,15 +1,19 @@
|
||||
"""The PECO Outage Counter integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from typing import Final
|
||||
|
||||
import peco
|
||||
from peco import BadJSONError, HttpError, OutageResults, PecoOutageApi
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import CONF_COUNTY, DOMAIN, LOGGER, SCAN_INTERVAL
|
||||
|
||||
PLATFORMS: Final = [Platform.SENSOR]
|
||||
|
||||
@ -17,7 +21,37 @@ PLATFORMS: Final = [Platform.SENSOR]
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up PECO Outage Counter from a config entry."""
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = peco.PecoOutageApi()
|
||||
websession = async_get_clientsession(hass)
|
||||
api = PecoOutageApi()
|
||||
county: str = entry.data[CONF_COUNTY]
|
||||
|
||||
async def async_update_data() -> OutageResults:
|
||||
"""Fetch data from API."""
|
||||
try:
|
||||
data: OutageResults = (
|
||||
await api.get_outage_totals(websession)
|
||||
if county == "TOTAL"
|
||||
else await api.get_outage_count(county, websession)
|
||||
)
|
||||
except HttpError as err:
|
||||
raise UpdateFailed(f"Error fetching data: {err}") from err
|
||||
except BadJSONError as err:
|
||||
raise UpdateFailed(f"Error parsing data: {err}") from err
|
||||
except asyncio.TimeoutError as err:
|
||||
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
||||
return data
|
||||
|
||||
coordinator = DataUpdateCoordinator(
|
||||
hass,
|
||||
LOGGER,
|
||||
name="PECO Outage Count",
|
||||
update_method=async_update_data,
|
||||
update_interval=timedelta(minutes=SCAN_INTERVAL),
|
||||
)
|
||||
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
|
||||
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
|
||||
|
||||
return True
|
||||
|
@ -31,9 +31,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
|
||||
)
|
||||
|
||||
county = user_input[
|
||||
CONF_COUNTY
|
||||
] # Voluptuous automatically detects if the county is invalid
|
||||
county = user_input[CONF_COUNTY]
|
||||
|
||||
await self.async_set_unique_id(county)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
@ -13,5 +13,6 @@ COUNTY_LIST: Final = [
|
||||
"YORK",
|
||||
"TOTAL",
|
||||
]
|
||||
CONFIG_FLOW_COUNTIES: Final = [{county: county.capitalize()} for county in COUNTY_LIST]
|
||||
SCAN_INTERVAL: Final = 9
|
||||
CONF_COUNTY: Final = "county"
|
||||
|
@ -1,13 +1,11 @@
|
||||
"""Sensor component for PECO outage counter."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
from typing import Final
|
||||
|
||||
from peco import BadJSONError, HttpError, OutageResults
|
||||
from peco import OutageResults
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
SensorEntity,
|
||||
@ -17,15 +15,13 @@ from homeassistant.components.sensor import (
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import PERCENTAGE
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.update_coordinator import (
|
||||
CoordinatorEntity,
|
||||
DataUpdateCoordinator,
|
||||
UpdateFailed,
|
||||
)
|
||||
|
||||
from .const import CONF_COUNTY, DOMAIN, LOGGER, SCAN_INTERVAL
|
||||
from .const import CONF_COUNTY, DOMAIN
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -74,35 +70,8 @@ async def async_setup_entry(
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the sensor platform."""
|
||||
api = hass.data[DOMAIN][config_entry.entry_id]
|
||||
websession = async_get_clientsession(hass)
|
||||
county: str = config_entry.data[CONF_COUNTY]
|
||||
|
||||
async def async_update_data() -> OutageResults:
|
||||
"""Fetch data from API."""
|
||||
try:
|
||||
data: OutageResults = (
|
||||
await api.get_outage_totals(websession)
|
||||
if county == "TOTAL"
|
||||
else await api.get_outage_count(county, websession)
|
||||
)
|
||||
except HttpError as err:
|
||||
raise UpdateFailed(f"Error fetching data: {err}") from err
|
||||
except BadJSONError as err:
|
||||
raise UpdateFailed(f"Error parsing data: {err}") from err
|
||||
except asyncio.TimeoutError as err:
|
||||
raise UpdateFailed(f"Timeout fetching data: {err}") from err
|
||||
return data
|
||||
|
||||
coordinator = DataUpdateCoordinator(
|
||||
hass,
|
||||
LOGGER,
|
||||
name="PECO Outage Count",
|
||||
update_method=async_update_data,
|
||||
update_interval=timedelta(minutes=SCAN_INTERVAL),
|
||||
)
|
||||
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
coordinator = hass.data[DOMAIN][config_entry.entry_id]
|
||||
|
||||
async_add_entities(
|
||||
[PecoSensor(sensor, county, coordinator) for sensor in SENSOR_LIST],
|
||||
|
@ -2,7 +2,6 @@
|
||||
"config": {
|
||||
"step": {
|
||||
"user": {
|
||||
"description": "Please choose your county below.",
|
||||
"data": {
|
||||
"county": "County"
|
||||
}
|
||||
@ -12,4 +11,4 @@
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -7,9 +7,7 @@
|
||||
"user": {
|
||||
"data": {
|
||||
"county": "County"
|
||||
},
|
||||
"description": "Please choose your county below.",
|
||||
"title": "PECO Outage Counter"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -54,4 +54,26 @@ async def test_invalid_county(hass: HomeAssistant) -> None:
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# it should have errored, instead of returning an errors dict, since this error should never happen
|
||||
second_result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert second_result["type"] == RESULT_TYPE_FORM
|
||||
assert second_result["errors"] is None
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.peco.async_setup_entry",
|
||||
return_value=True,
|
||||
):
|
||||
second_result2 = await hass.config_entries.flow.async_configure(
|
||||
second_result["flow_id"],
|
||||
{
|
||||
"county": "PHILADELPHIA",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert second_result2["type"] == RESULT_TYPE_CREATE_ENTRY
|
||||
assert second_result2["title"] == "Philadelphia Outage Count"
|
||||
assert second_result2["data"] == {
|
||||
"county": "PHILADELPHIA",
|
||||
}
|
||||
|
@ -1,5 +1,9 @@
|
||||
"""Test the PECO Outage Counter init file."""
|
||||
from peco import PecoOutageApi
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
from peco import BadJSONError, HttpError
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.peco.const import DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
@ -8,7 +12,8 @@ from homeassistant.core import HomeAssistant
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
MOCK_ENTRY_DATA = {"county": "TOTAL"}
|
||||
INVALID_COUNTY_DATA = {"county": "INVALID_COUNTY_THAT_SHOULD_NOT_EXIST", "test": True}
|
||||
COUNTY_ENTRY_DATA = {"county": "BUCKS"}
|
||||
INVALID_COUNTY_DATA = {"county": "INVALID"}
|
||||
|
||||
|
||||
async def test_unload_entry(hass: HomeAssistant) -> None:
|
||||
@ -16,7 +21,17 @@ async def test_unload_entry(hass: HomeAssistant) -> None:
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_totals",
|
||||
return_value={
|
||||
"customers_out": 0,
|
||||
"percent_customers_out": 0,
|
||||
"outage_count": 0,
|
||||
"customers_served": 350394,
|
||||
},
|
||||
):
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
entries = hass.config_entries.async_entries(DOMAIN)
|
||||
@ -28,11 +43,104 @@ async def test_unload_entry(hass: HomeAssistant) -> None:
|
||||
assert entries[0].state == ConfigEntryState.NOT_LOADED
|
||||
|
||||
|
||||
async def test_data(hass: HomeAssistant) -> None:
|
||||
"""Test the data."""
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
@pytest.mark.parametrize(
|
||||
"sensor",
|
||||
[
|
||||
"bucks_customers_out",
|
||||
"bucks_percent_customers_out",
|
||||
"bucks_outage_count",
|
||||
"bucks_customers_served",
|
||||
],
|
||||
)
|
||||
async def test_update_timeout(hass: HomeAssistant, sensor):
|
||||
"""Test if it raises an error when there is a timeout."""
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
assert hass.data[DOMAIN]
|
||||
assert isinstance(hass.data[DOMAIN][config_entry.entry_id], PecoOutageApi)
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_count",
|
||||
side_effect=asyncio.TimeoutError(),
|
||||
):
|
||||
assert not await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(f"sensor.{sensor}") is None
|
||||
assert config_entry.state == ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sensor",
|
||||
[
|
||||
"total_customers_out",
|
||||
"total_percent_customers_out",
|
||||
"total_outage_count",
|
||||
"total_customers_served",
|
||||
],
|
||||
)
|
||||
async def test_total_update_timeout(hass: HomeAssistant, sensor):
|
||||
"""Test if it raises an error when there is a timeout."""
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_totals",
|
||||
side_effect=asyncio.TimeoutError(),
|
||||
):
|
||||
assert not await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(f"sensor.{sensor}") is None
|
||||
assert config_entry.state == ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sensor",
|
||||
[
|
||||
"bucks_customers_out",
|
||||
"bucks_percent_customers_out",
|
||||
"bucks_outage_count",
|
||||
"bucks_customers_served",
|
||||
],
|
||||
)
|
||||
async def test_http_error(hass: HomeAssistant, sensor: str):
|
||||
"""Test if it raises an error when an abnormal status code is returned."""
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_count",
|
||||
side_effect=HttpError(),
|
||||
):
|
||||
assert not await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(f"sensor.{sensor}") is None
|
||||
assert config_entry.state == ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sensor",
|
||||
[
|
||||
"bucks_customers_out",
|
||||
"bucks_percent_customers_out",
|
||||
"bucks_outage_count",
|
||||
"bucks_customers_served",
|
||||
],
|
||||
)
|
||||
async def test_bad_json(hass: HomeAssistant, sensor: str):
|
||||
"""Test if it raises an error when abnormal JSON is returned."""
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_count",
|
||||
side_effect=BadJSONError(),
|
||||
):
|
||||
assert not await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert hass.states.get(f"sensor.{sensor}") is None
|
||||
assert config_entry.state == ConfigEntryState.SETUP_RETRY
|
||||
|
@ -1,256 +1,82 @@
|
||||
"""Test the PECO Outage Counter sensors."""
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
from peco import OutageResults
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.peco.const import DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.test_util.aiohttp import AiohttpClientMocker
|
||||
|
||||
MOCK_ENTRY_DATA = {"county": "TOTAL"}
|
||||
COUNTY_ENTRY_DATA = {"county": "BUCKS"}
|
||||
INVALID_COUNTY_DATA = {"county": "INVALID"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sensor,expected",
|
||||
[
|
||||
("customers_out", "123"),
|
||||
("percent_customers_out", "15"),
|
||||
("outage_count", "456"),
|
||||
("customers_served", "789"),
|
||||
],
|
||||
)
|
||||
async def test_sensor_available(
|
||||
aioclient_mock: AiohttpClientMocker, hass: HomeAssistant
|
||||
hass: HomeAssistant, sensor: str, expected: str
|
||||
) -> None:
|
||||
"""Test that the sensors are working."""
|
||||
# Totals Test
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {"interval_generation_data": "data/TEST"}},
|
||||
)
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/data/TEST/public/reports/a36a6292-1c55-44de-a6a9-44fedf9482ee_report.json",
|
||||
json={
|
||||
"file_data": {
|
||||
"totals": {
|
||||
"cust_a": {
|
||||
"val": 123,
|
||||
},
|
||||
"percent_cust_a": {
|
||||
"val": 1.23,
|
||||
},
|
||||
"n_out": 456,
|
||||
"cust_s": 789,
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_totals",
|
||||
return_value=OutageResults(
|
||||
customers_out=123,
|
||||
percent_customers_out=15.589,
|
||||
outage_count=456,
|
||||
customers_served=789,
|
||||
),
|
||||
):
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
entries = hass.config_entries.async_entries(DOMAIN)
|
||||
assert len(entries) == 1
|
||||
assert config_entry.state == ConfigEntryState.LOADED
|
||||
|
||||
sensors_to_get = [
|
||||
"total_customers_out",
|
||||
"total_percent_customers_out",
|
||||
"total_outage_count",
|
||||
"total_customers_served",
|
||||
]
|
||||
|
||||
for sensor in sensors_to_get:
|
||||
sensor_entity = hass.states.get(f"sensor.{sensor}")
|
||||
assert sensor_entity is not None
|
||||
assert sensor_entity.state != "unavailable"
|
||||
|
||||
if sensor == "total_customers_out":
|
||||
assert sensor_entity.state == "123"
|
||||
elif sensor == "total_percent_customers_out":
|
||||
assert sensor_entity.state == "1"
|
||||
elif sensor == "total_outage_count":
|
||||
assert sensor_entity.state == "456"
|
||||
elif sensor == "total_customers_served":
|
||||
assert sensor_entity.state == "789"
|
||||
sensor_entity = hass.states.get(f"sensor.total_{sensor}")
|
||||
assert sensor_entity is not None
|
||||
assert sensor_entity.state != "unavailable"
|
||||
assert sensor_entity.state == expected
|
||||
|
||||
# County Test
|
||||
aioclient_mock.clear_requests()
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {"interval_generation_data": "data/TEST"}},
|
||||
)
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/data/TEST/public/reports/a36a6292-1c55-44de-a6a9-44fedf9482ee_report.json",
|
||||
json={
|
||||
"file_data": {
|
||||
"areas": [
|
||||
{
|
||||
"name": "BUCKS",
|
||||
"cust_a": {
|
||||
"val": 123,
|
||||
},
|
||||
"percent_cust_a": {
|
||||
"val": 1.23,
|
||||
},
|
||||
"n_out": 456,
|
||||
"cust_s": 789,
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
with patch(
|
||||
"peco.PecoOutageApi.get_outage_count",
|
||||
return_value=OutageResults(
|
||||
customers_out=123,
|
||||
percent_customers_out=15.589,
|
||||
outage_count=456,
|
||||
customers_served=789,
|
||||
),
|
||||
):
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
entries = hass.config_entries.async_entries(DOMAIN)
|
||||
assert len(entries) == 2
|
||||
assert config_entry.state == ConfigEntryState.LOADED
|
||||
|
||||
sensors_to_get = [
|
||||
"bucks_customers_out",
|
||||
"bucks_percent_customers_out",
|
||||
"bucks_outage_count",
|
||||
"bucks_customers_served",
|
||||
]
|
||||
|
||||
for sensor in sensors_to_get:
|
||||
sensor_entity = hass.states.get(f"sensor.{sensor}")
|
||||
assert sensor_entity is not None
|
||||
assert sensor_entity.state != "unavailable"
|
||||
|
||||
if sensor == "bucks_customers_out":
|
||||
assert sensor_entity.state == "123"
|
||||
elif sensor == "bucks_percent_customers_out":
|
||||
assert sensor_entity.state == "1"
|
||||
elif sensor == "bucks_outage_count":
|
||||
assert sensor_entity.state == "456"
|
||||
elif sensor == "bucks_customers_served":
|
||||
assert sensor_entity.state == "789"
|
||||
|
||||
|
||||
async def test_http_error(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is an HTTP error."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {"interval_generation_data": "data/TEST"}},
|
||||
)
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/data/TEST/public/reports/a36a6292-1c55-44de-a6a9-44fedf9482ee_report.json",
|
||||
status=500,
|
||||
json={"error": "Internal Server Error"},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "Error getting PECO outage counter data" in caplog.text
|
||||
|
||||
|
||||
async def test_bad_json(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is bad JSON."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {}},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "ConfigEntryNotReady" in caplog.text
|
||||
|
||||
|
||||
async def test_total_http_error(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is an HTTP error."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {"interval_generation_data": "data/TEST"}},
|
||||
)
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/data/TEST/public/reports/a36a6292-1c55-44de-a6a9-44fedf9482ee_report.json",
|
||||
status=500,
|
||||
json={"error": "Internal Server Error"},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "Error getting PECO outage counter data" in caplog.text
|
||||
|
||||
|
||||
async def test_total_bad_json(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is bad JSON."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
json={"data": {}},
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "ConfigEntryNotReady" in caplog.text
|
||||
|
||||
|
||||
async def test_update_timeout(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is a timeout."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
exc=asyncio.TimeoutError(),
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=COUNTY_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "Timeout fetching data" in caplog.text
|
||||
|
||||
|
||||
async def test_total_update_timeout(
|
||||
hass: HomeAssistant, aioclient_mock: AiohttpClientMocker, caplog
|
||||
):
|
||||
"""Test if it raises an error when there is a timeout."""
|
||||
aioclient_mock.get(
|
||||
"https://kubra.io/stormcenter/api/v1/stormcenters/39e6d9f3-fdea-4539-848f-b8631945da6f/views/74de8a50-3f45-4f6a-9483-fd618bb9165d/currentState?preview=false",
|
||||
exc=asyncio.TimeoutError(),
|
||||
)
|
||||
|
||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_ENTRY_DATA)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.data[DOMAIN]
|
||||
|
||||
assert "Timeout fetching data" in caplog.text
|
||||
sensor_entity = hass.states.get(f"sensor.bucks_{sensor}")
|
||||
assert sensor_entity is not None
|
||||
assert sensor_entity.state != "unavailable"
|
||||
assert sensor_entity.state == expected
|
||||
|
Loading…
x
Reference in New Issue
Block a user