Move speedtestdotnet coordinator to separate file (#83979)

This commit is contained in:
Rami Mosleh 2022-12-18 11:57:17 +02:00 committed by GitHub
parent 487d84c8f9
commit 10a6c56fec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 162 deletions

View File

@ -2,34 +2,37 @@
from __future__ import annotations
from datetime import timedelta
import logging
from functools import partial
import speedtest
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_SCAN_INTERVAL, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import (
CONF_MANUAL,
CONF_SERVER_ID,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SERVER,
DOMAIN,
PLATFORMS,
from homeassistant.const import (
CONF_SCAN_INTERVAL,
EVENT_HOMEASSISTANT_STARTED,
Platform,
)
from homeassistant.core import CoreState, Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
_LOGGER = logging.getLogger(__name__)
from .const import CONF_MANUAL, DEFAULT_SCAN_INTERVAL, DOMAIN
from .coordinator import SpeedTestDataCoordinator
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up the Speedtest.net component."""
coordinator = SpeedTestDataCoordinator(hass, config_entry)
await coordinator.async_setup()
try:
api = await hass.async_add_executor_job(
partial(speedtest.Speedtest, secure=True)
)
coordinator = SpeedTestDataCoordinator(hass, config_entry, api)
await hass.async_add_executor_job(coordinator.update_servers)
except speedtest.SpeedtestException as err:
raise ConfigEntryNotReady from err
async def _enable_scheduled_speedtests(*_):
async def _enable_scheduled_speedtests(event: Event | None = None) -> None:
"""Activate the data update coordinator."""
coordinator.update_interval = timedelta(
minutes=config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
@ -51,104 +54,28 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
config_entry.async_on_unload(
config_entry.add_update_listener(options_updated_listener)
)
return True
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload SpeedTest Entry from config_entry."""
unload_ok = await hass.config_entries.async_unload_platforms(
if unload_ok := await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
)
if unload_ok:
):
hass.data.pop(DOMAIN)
return unload_ok
class SpeedTestDataCoordinator(DataUpdateCoordinator):
"""Get the latest data from speedtest.net."""
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize the data object."""
self.hass = hass
self.config_entry: ConfigEntry = config_entry
self.api: speedtest.Speedtest | None = None
self.servers: dict[str, dict] = {DEFAULT_SERVER: {}}
super().__init__(
self.hass,
_LOGGER,
name=DOMAIN,
update_method=self.async_update,
)
def initialize(self) -> None:
"""Initialize speedtest api."""
self.api = speedtest.Speedtest(secure=True)
self.update_servers()
def update_servers(self):
"""Update list of test servers."""
test_servers = self.api.get_servers()
test_servers_list = []
for servers in test_servers.values():
for server in servers:
test_servers_list.append(server)
for server in sorted(
test_servers_list,
key=lambda server: (
server["country"],
server["name"],
server["sponsor"],
),
):
self.servers[
f"{server['country']} - {server['sponsor']} - {server['name']}"
] = server
def update_data(self):
"""Get the latest data from speedtest.net."""
self.update_servers()
self.api.closest.clear()
if self.config_entry.options.get(CONF_SERVER_ID):
server_id = self.config_entry.options.get(CONF_SERVER_ID)
self.api.get_servers(servers=[server_id])
best_server = self.api.get_best_server()
_LOGGER.debug(
"Executing speedtest.net speed test with server_id: %s",
best_server["id"],
)
self.api.download()
self.api.upload()
return self.api.results.dict()
async def async_update(self) -> dict[str, str]:
"""Update Speedtest data."""
try:
return await self.hass.async_add_executor_job(self.update_data)
except speedtest.NoMatchedServers as err:
raise UpdateFailed("Selected server is not found.") from err
except speedtest.SpeedtestException as err:
raise UpdateFailed(err) from err
async def async_setup(self) -> None:
"""Set up SpeedTest."""
try:
await self.hass.async_add_executor_job(self.initialize)
except speedtest.SpeedtestException as err:
raise ConfigEntryNotReady from err
self.config_entry.async_on_unload(
self.config_entry.add_update_listener(options_updated_listener)
)
async def options_updated_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
coordinator: SpeedTestDataCoordinator = hass.data[DOMAIN]
if entry.options[CONF_MANUAL]:
hass.data[DOMAIN].update_interval = None
coordinator.update_interval = None
return
hass.data[DOMAIN].update_interval = timedelta(
minutes=entry.options[CONF_SCAN_INTERVAL]
)
await hass.data[DOMAIN].async_request_refresh()
coordinator.update_interval = timedelta(minutes=entry.options[CONF_SCAN_INTERVAL])
await coordinator.async_request_refresh()

View File

@ -30,7 +30,7 @@ class SpeedTestFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> config_entries.OptionsFlow:
) -> SpeedTestOptionsFlowHandler:
"""Get the options flow for this handler."""
return SpeedTestOptionsFlowHandler(config_entry)

View File

@ -1,52 +1,10 @@
"""Constants used by Speedtest.net."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Final
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import TIME_MILLISECONDS, Platform, UnitOfDataRate
DOMAIN: Final = "speedtestdotnet"
@dataclass
class SpeedtestSensorEntityDescription(SensorEntityDescription):
"""Class describing Speedtest sensor entities."""
value: Callable = round
SENSOR_TYPES: Final[tuple[SpeedtestSensorEntityDescription, ...]] = (
SpeedtestSensorEntityDescription(
key="ping",
name="Ping",
native_unit_of_measurement=TIME_MILLISECONDS,
state_class=SensorStateClass.MEASUREMENT,
),
SpeedtestSensorEntityDescription(
key="download",
name="Download",
device_class=SensorDeviceClass.DATA_RATE,
native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
value=lambda value: round(value / 10**6, 2),
),
SpeedtestSensorEntityDescription(
key="upload",
name="Upload",
device_class=SensorDeviceClass.DATA_RATE,
native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
value=lambda value: round(value / 10**6, 2),
),
)
CONF_SERVER_NAME: Final = "server_name"
CONF_SERVER_ID: Final = "server_id"
CONF_MANUAL: Final = "manual"
@ -65,5 +23,3 @@ DEFAULT_SERVER: Final = "*Auto Detect"
ATTRIBUTION: Final = "Data retrieved from Speedtest.net by Ookla"
ICON: Final = "mdi:speedometer"
PLATFORMS: Final = [Platform.SENSOR]

View File

@ -0,0 +1,79 @@
"""Coordinator for speedtestdotnet."""
import logging
from typing import Any
import speedtest
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_SERVER_ID, DEFAULT_SERVER, DOMAIN
_LOGGER = logging.getLogger(__name__)
class SpeedTestDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Get the latest data from speedtest.net."""
config_entry: ConfigEntry
def __init__(
self, hass: HomeAssistant, config_entry: ConfigEntry, api: speedtest.Speedtest
) -> None:
"""Initialize the data object."""
self.hass = hass
self.config_entry = config_entry
self.api = api
self.servers: dict[str, dict] = {DEFAULT_SERVER: {}}
super().__init__(
self.hass,
_LOGGER,
name=DOMAIN,
)
def update_servers(self) -> None:
"""Update list of test servers."""
test_servers = self.api.get_servers()
test_servers_list = []
for servers in test_servers.values():
for server in servers:
test_servers_list.append(server)
for server in sorted(
test_servers_list,
key=lambda server: (
server["country"],
server["name"],
server["sponsor"],
),
):
self.servers[
f"{server['country']} - {server['sponsor']} - {server['name']}"
] = server
def update_data(self) -> dict[str, Any]:
"""Get the latest data from speedtest.net."""
self.update_servers()
self.api.closest.clear()
if self.config_entry.options.get(CONF_SERVER_ID):
server_id = self.config_entry.options.get(CONF_SERVER_ID)
self.api.get_servers(servers=[server_id])
best_server = self.api.get_best_server()
_LOGGER.debug(
"Executing speedtest.net speed test with server_id: %s",
best_server["id"],
)
self.api.download()
self.api.upload()
return self.api.results.dict()
async def _async_update_data(self) -> dict[str, Any]:
"""Update Speedtest data."""
try:
return await self.hass.async_add_executor_job(self.update_data)
except speedtest.NoMatchedServers as err:
raise UpdateFailed("Selected server is not found.") from err
except speedtest.SpeedtestException as err:
raise UpdateFailed(err) from err

View File

@ -1,10 +1,17 @@
"""Support for Speedtest.net internet speed testing sensor."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, cast
from homeassistant.components.sensor import SensorEntity
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import TIME_MILLISECONDS, UnitOfDataRate
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
@ -13,7 +20,6 @@ from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import StateType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import SpeedTestDataCoordinator
from .const import (
ATTR_BYTES_RECEIVED,
ATTR_BYTES_SENT,
@ -24,8 +30,38 @@ from .const import (
DEFAULT_NAME,
DOMAIN,
ICON,
SENSOR_TYPES,
SpeedtestSensorEntityDescription,
)
from .coordinator import SpeedTestDataCoordinator
@dataclass
class SpeedtestSensorEntityDescription(SensorEntityDescription):
"""Class describing Speedtest sensor entities."""
value: Callable = round
SENSOR_TYPES: tuple[SpeedtestSensorEntityDescription, ...] = (
SpeedtestSensorEntityDescription(
key="ping",
name="Ping",
native_unit_of_measurement=TIME_MILLISECONDS,
state_class=SensorStateClass.MEASUREMENT,
),
SpeedtestSensorEntityDescription(
key="download",
name="Download",
native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
value=lambda value: round(value / 10**6, 2),
),
SpeedtestSensorEntityDescription(
key="upload",
name="Upload",
native_unit_of_measurement=UnitOfDataRate.MEGABITS_PER_SECOND,
state_class=SensorStateClass.MEASUREMENT,
value=lambda value: round(value / 10**6, 2),
),
)

View File

@ -1,13 +1,10 @@
"""Tests for SpeedTest sensors."""
from unittest.mock import MagicMock
from homeassistant.components import speedtestdotnet
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.speedtestdotnet.const import (
CONF_MANUAL,
DEFAULT_NAME,
SENSOR_TYPES,
)
from homeassistant.components.speedtestdotnet import DOMAIN
from homeassistant.components.speedtestdotnet.const import CONF_MANUAL, DEFAULT_NAME
from homeassistant.components.speedtestdotnet.sensor import SENSOR_TYPES
from homeassistant.core import HomeAssistant, State
from . import MOCK_RESULTS, MOCK_SERVERS, MOCK_STATES
@ -19,7 +16,7 @@ async def test_speedtestdotnet_sensors(
hass: HomeAssistant, mock_api: MagicMock
) -> None:
"""Test sensors created for speedtestdotnet integration."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={})
entry = MockConfigEntry(domain=DOMAIN, data={})
entry.add_to_hass(hass)
mock_api.return_value.get_best_server.return_value = MOCK_SERVERS[1][0]
@ -45,9 +42,7 @@ async def test_restore_last_state(hass: HomeAssistant, mock_api: MagicMock) -> N
for sensor, state in MOCK_STATES.items()
],
)
entry = MockConfigEntry(
domain=speedtestdotnet.DOMAIN, data={}, options={CONF_MANUAL: True}
)
entry = MockConfigEntry(domain=DOMAIN, data={}, options={CONF_MANUAL: True})
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)