Add Speedtestdotnet config_flow (#36254)

This commit is contained in:
Rami Mosleh 2020-06-10 19:33:48 +03:00 committed by GitHub
parent c65e72886c
commit 2c1a76cf92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 678 additions and 104 deletions

View File

@ -377,7 +377,7 @@ homeassistant/components/somfy/* @tetienne
homeassistant/components/sonarr/* @ctalkington homeassistant/components/sonarr/* @ctalkington
homeassistant/components/songpal/* @rytilahti @shenxn homeassistant/components/songpal/* @rytilahti @shenxn
homeassistant/components/spaceapi/* @fabaff homeassistant/components/spaceapi/* @fabaff
homeassistant/components/speedtestdotnet/* @rohankapoorcom homeassistant/components/speedtestdotnet/* @rohankapoorcom @engrbm87
homeassistant/components/spider/* @peternijssen homeassistant/components/spider/* @peternijssen
homeassistant/components/spotify/* @frenck homeassistant/components/spotify/* @frenck
homeassistant/components/sql/* @dgomes homeassistant/components/sql/* @dgomes

View File

@ -5,30 +5,32 @@ import logging
import speedtest import speedtest
import voluptuous as vol import voluptuous as vol
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import async_load_platform from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from .const import DATA_UPDATED, DOMAIN, SENSOR_TYPES from .const import (
CONF_MANUAL,
CONF_SERVER_ID,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SERVER,
DOMAIN,
SENSOR_TYPES,
SPEED_TEST_SERVICE,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
CONF_SERVER_ID = "server_id"
CONF_MANUAL = "manual"
DEFAULT_INTERVAL = timedelta(hours=1)
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
DOMAIN: vol.Schema( DOMAIN: vol.Schema(
{ {
vol.Optional(CONF_SERVER_ID): cv.positive_int, vol.Optional(CONF_SERVER_ID): cv.positive_int,
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_INTERVAL): vol.All( vol.Optional(
cv.time_period, cv.positive_timedelta CONF_SCAN_INTERVAL, default=timedelta(minutes=DEFAULT_SCAN_INTERVAL)
), ): vol.All(cv.time_period, cv.positive_timedelta),
vol.Optional(CONF_MANUAL, default=False): cv.boolean, vol.Optional(CONF_MANUAL, default=False): cv.boolean,
vol.Optional( vol.Optional(
CONF_MONITORED_CONDITIONS, default=list(SENSOR_TYPES) CONF_MONITORED_CONDITIONS, default=list(SENSOR_TYPES)
@ -40,46 +42,148 @@ CONFIG_SCHEMA = vol.Schema(
) )
def server_id_valid(server_id):
"""Check if server_id is valid."""
try:
api = speedtest.Speedtest()
api.get_servers([int(server_id)])
except (speedtest.ConfigRetrievalError, speedtest.NoMatchedServers):
return False
return True
async def async_setup(hass, config): async def async_setup(hass, config):
"""Import integration from config."""
if DOMAIN in config:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=config[DOMAIN]
)
)
return True
async def async_setup_entry(hass, config_entry):
"""Set up the Speedtest.net component.""" """Set up the Speedtest.net component."""
conf = config[DOMAIN] coordinator = SpeedTestDataCoordinator(hass, config_entry)
data = hass.data[DOMAIN] = SpeedtestData(hass, conf.get(CONF_SERVER_ID)) await coordinator.async_setup()
if not conf[CONF_MANUAL]: await coordinator.async_refresh()
async_track_time_interval(hass, data.update, conf[CONF_SCAN_INTERVAL]) if not coordinator.last_update_success:
raise ConfigEntryNotReady
def update(call=None): hass.data[DOMAIN] = coordinator
"""Service call to manually update the data."""
data.update()
hass.services.async_register(DOMAIN, "speedtest", update)
hass.async_create_task( hass.async_create_task(
async_load_platform( hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
hass, SENSOR_DOMAIN, DOMAIN, conf[CONF_MONITORED_CONDITIONS], config
)
) )
return True return True
class SpeedtestData: async def async_unload_entry(hass, config_entry):
"""Unload SpeedTest Entry from config_entry."""
hass.services.async_remove(DOMAIN, SPEED_TEST_SERVICE)
await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
hass.data.pop(DOMAIN)
return True
class SpeedTestDataCoordinator(DataUpdateCoordinator):
"""Get the latest data from speedtest.net.""" """Get the latest data from speedtest.net."""
def __init__(self, hass, server_id): def __init__(self, hass, config_entry):
"""Initialize the data object.""" """Initialize the data object."""
self.data = None self.hass = hass
self._hass = hass self.config_entry = config_entry
self._servers = [] if server_id is None else [server_id] self.api = None
self.servers = {}
super().__init__(
self.hass,
_LOGGER,
name=DOMAIN,
update_method=self.async_update,
update_interval=timedelta(
minutes=self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
),
)
def update(self, now=None): def update_data(self):
"""Get the latest data from speedtest.net.""" """Get the latest data from speedtest.net."""
server_list = self.api.get_servers()
_LOGGER.debug("Executing speedtest.net speed test") self.servers[DEFAULT_SERVER] = {}
speed = speedtest.Speedtest() for server in sorted(
speed.get_servers(self._servers) server_list.values(), key=lambda server: server[0]["country"]
speed.get_best_server() ):
speed.download() self.servers[f"{server[0]['country']} - {server[0]['sponsor']}"] = server[0]
speed.upload()
self.data = speed.results.dict() if self.config_entry.options.get(CONF_SERVER_ID):
dispatcher_send(self._hass, DATA_UPDATED) server_id = self.config_entry.options.get(CONF_SERVER_ID)
self.api.closest.clear()
self.api.get_servers(servers=[server_id])
self.api.get_best_server()
_LOGGER.debug(
"Executing speedtest.net speed test with server_id: %s", self.api.best["id"]
)
self.api.download()
self.api.upload()
return self.api.results.dict()
async def async_update(self, *_):
"""Update Speedtest data."""
try:
return await self.hass.async_add_executor_job(self.update_data)
except (speedtest.ConfigRetrievalError, speedtest.NoMatchedServers):
raise UpdateFailed
async def async_set_options(self):
"""Set options for entry."""
if not self.config_entry.options:
data = {**self.config_entry.data}
options = {
CONF_SCAN_INTERVAL: data.pop(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL),
CONF_MANUAL: data.pop(CONF_MANUAL, False),
CONF_SERVER_ID: str(data.pop(CONF_SERVER_ID, "")),
}
self.hass.config_entries.async_update_entry(
self.config_entry, data=data, options=options
)
async def async_setup(self):
"""Set up SpeedTest."""
try:
self.api = await self.hass.async_add_executor_job(speedtest.Speedtest)
except speedtest.ConfigRetrievalError:
raise ConfigEntryNotReady
async def request_update(call):
"""Request update."""
await self.async_request_refresh()
await self.async_set_options()
self.hass.services.async_register(DOMAIN, SPEED_TEST_SERVICE, request_update)
self.config_entry.add_update_listener(options_updated_listener)
async def options_updated_listener(hass, entry):
"""Handle options update."""
if not entry.options[CONF_MANUAL]:
hass.data[DOMAIN].update_interval = timedelta(
minutes=entry.options[CONF_SCAN_INTERVAL]
)
await hass.data[DOMAIN].async_request_refresh()
return
# set the update interval to a very long time
# if the user wants to disable auto update
hass.data[DOMAIN].update_interval = timedelta(days=7)

View File

@ -0,0 +1,117 @@
"""Config flow for Speedtest.net."""
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL
from homeassistant.core import callback
from . import server_id_valid
from .const import (
CONF_MANUAL,
CONF_SERVER_ID,
CONF_SERVER_NAME,
DEFAULT_NAME,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SERVER,
)
from .const import DOMAIN # pylint: disable=unused-import
_LOGGER = logging.getLogger(__name__)
class SpeedTestFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle Speedtest.net config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return SpeedTestOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
if self._async_current_entries():
return self.async_abort(reason="one_instance_allowed")
if user_input is None:
return self.async_show_form(step_id="user")
return self.async_create_entry(title=DEFAULT_NAME, data=user_input)
async def async_step_import(self, import_config):
"""Import from config."""
if (
CONF_SERVER_ID in import_config
and not await self.hass.async_add_executor_job(
server_id_valid, import_config[CONF_SERVER_ID]
)
):
return self.async_abort(reason="wrong_server_id")
import_config[CONF_SCAN_INTERVAL] = int(
import_config[CONF_SCAN_INTERVAL].seconds / 60
)
import_config.pop(CONF_MONITORED_CONDITIONS)
return await self.async_step_user(user_input=import_config)
class SpeedTestOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle SpeedTest options."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
self._servers = {}
async def async_step_init(self, user_input=None):
"""Manage the options."""
errors = {}
if user_input is not None:
server_name = user_input[CONF_SERVER_NAME]
if server_name != "*Auto Detect":
server_id = self._servers[server_name]["id"]
user_input[CONF_SERVER_ID] = server_id
else:
user_input[CONF_SERVER_ID] = None
return self.async_create_entry(title="", data=user_input)
self._servers = self.hass.data[DOMAIN].servers
server_name = DEFAULT_SERVER
if self.config_entry.options.get(
CONF_SERVER_ID
) and not self.config_entry.options.get(CONF_SERVER_NAME):
server = [
key
for (key, value) in self._servers.items()
if value.get("id") == self.config_entry.options[CONF_SERVER_ID]
]
server_name = server[0]
options = {
vol.Optional(
CONF_SERVER_NAME,
default=self.config_entry.options.get(CONF_SERVER_NAME, server_name),
): vol.In(self._servers.keys()),
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
): int,
vol.Optional(
CONF_MANUAL, default=self.config_entry.options.get(CONF_MANUAL, False)
): bool,
}
return self.async_show_form(
step_id="init", data_schema=vol.Schema(options), errors=errors
)

View File

@ -1,8 +1,9 @@
"""Consts used by Speedtest.net.""" """Consts used by Speedtest.net."""
from homeassistant.const import DATA_RATE_MEGABITS_PER_SECOND, TIME_MILLISECONDS from homeassistant.const import DATA_RATE_MEGABITS_PER_SECOND, TIME_MILLISECONDS
DOMAIN = "speedtestdotnet" DOMAIN = "speedtestdotnet"
SPEED_TEST_SERVICE = "speedtest"
DATA_UPDATED = f"{DOMAIN}_data_updated" DATA_UPDATED = f"{DOMAIN}_data_updated"
SENSOR_TYPES = { SENSOR_TYPES = {
@ -10,3 +11,22 @@ SENSOR_TYPES = {
"download": ["Download", DATA_RATE_MEGABITS_PER_SECOND], "download": ["Download", DATA_RATE_MEGABITS_PER_SECOND],
"upload": ["Upload", DATA_RATE_MEGABITS_PER_SECOND], "upload": ["Upload", DATA_RATE_MEGABITS_PER_SECOND],
} }
CONF_SERVER_NAME = "server_name"
CONF_SERVER_ID = "server_id"
CONF_MANUAL = "manual"
ATTR_BYTES_RECEIVED = "bytes_received"
ATTR_BYTES_SENT = "bytes_sent"
ATTR_SERVER_COUNTRY = "server_country"
ATTR_SERVER_ID = "server_id"
ATTR_SERVER_NAME = "server_name"
DEFAULT_NAME = "SpeedTest"
DEFAULT_SCAN_INTERVAL = 60
DEFAULT_SERVER = "*Auto Detect"
ATTRIBUTION = "Data retrieved from Speedtest.net by Ookla"
ICON = "mdi:speedometer"

View File

@ -1,7 +1,8 @@
{ {
"domain": "speedtestdotnet", "domain": "speedtestdotnet",
"name": "Speedtest.net", "name": "Speedtest.net",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/speedtestdotnet", "documentation": "https://www.home-assistant.io/integrations/speedtestdotnet",
"requirements": ["speedtest-cli==2.1.2"], "requirements": ["speedtest-cli==2.1.2"],
"codeowners": ["@rohankapoorcom"] "codeowners": ["@rohankapoorcom", "@engrbm87"]
} }

View File

@ -2,54 +2,67 @@
import logging import logging
from homeassistant.const import ATTR_ATTRIBUTION from homeassistant.const import ATTR_ATTRIBUTION
from homeassistant.core import callback from homeassistant.helpers.entity import Entity
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.restore_state import RestoreEntity
from .const import DATA_UPDATED, DOMAIN as SPEEDTESTDOTNET_DOMAIN, SENSOR_TYPES from .const import (
ATTR_BYTES_RECEIVED,
ATTR_BYTES_SENT,
ATTR_SERVER_COUNTRY,
ATTR_SERVER_ID,
ATTR_SERVER_NAME,
ATTRIBUTION,
DEFAULT_NAME,
DOMAIN,
ICON,
SENSOR_TYPES,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
ATTR_BYTES_RECEIVED = "bytes_received"
ATTR_BYTES_SENT = "bytes_sent"
ATTR_SERVER_COUNTRY = "server_country"
ATTR_SERVER_HOST = "server_host"
ATTR_SERVER_ID = "server_id"
ATTR_SERVER_LATENCY = "latency"
ATTR_SERVER_NAME = "server_name"
ATTRIBUTION = "Data retrieved from Speedtest.net by Ookla" async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the Speedtestdotnet sensors."""
ICON = "mdi:speedometer" speedtest_coordinator = hass.data[DOMAIN]
entities = []
for sensor_type in SENSOR_TYPES:
entities.append(SpeedtestSensor(speedtest_coordinator, sensor_type))
async_add_entities(entities)
async def async_setup_platform(hass, config, async_add_entities, discovery_info): class SpeedtestSensor(Entity):
"""Set up the Speedtest.net sensor."""
data = hass.data[SPEEDTESTDOTNET_DOMAIN]
async_add_entities([SpeedtestSensor(data, sensor) for sensor in discovery_info])
class SpeedtestSensor(RestoreEntity):
"""Implementation of a speedtest.net sensor.""" """Implementation of a speedtest.net sensor."""
def __init__(self, speedtest_data, sensor_type): def __init__(self, coordinator, sensor_type):
"""Initialize the sensor.""" """Initialize the sensor."""
self._name = SENSOR_TYPES[sensor_type][0] self._name = SENSOR_TYPES[sensor_type][0]
self.speedtest_client = speedtest_data self.coordinator = coordinator
self.type = sensor_type self.type = sensor_type
self._state = None
self._data = None
self._unit_of_measurement = SENSOR_TYPES[self.type][1] self._unit_of_measurement = SENSOR_TYPES[self.type][1]
@property @property
def name(self): def name(self):
"""Return the name of the sensor.""" """Return the name of the sensor."""
return "{} {}".format("Speedtest", self._name) return f"{DEFAULT_NAME} {self._name}"
@property
def unique_id(self):
"""Return sensor unique_id."""
return self.type
@property @property
def state(self): def state(self):
"""Return the state of the device.""" """Return the state of the device."""
return self._state state = None
if self.type == "ping":
state = self.coordinator.data["ping"]
elif self.type == "download":
state = round(self.coordinator.data["download"] / 10 ** 6, 2)
elif self.type == "upload":
state = round(self.coordinator.data["upload"] / 10 ** 6, 2)
return state
@property @property
def unit_of_measurement(self): def unit_of_measurement(self):
@ -69,47 +82,27 @@ class SpeedtestSensor(RestoreEntity):
@property @property
def device_state_attributes(self): def device_state_attributes(self):
"""Return the state attributes.""" """Return the state attributes."""
attributes = {ATTR_ATTRIBUTION: ATTRIBUTION} attributes = {
if self._data is not None: ATTR_ATTRIBUTION: ATTRIBUTION,
return attributes.update( ATTR_SERVER_NAME: self.coordinator.data["server"]["name"],
{ ATTR_SERVER_COUNTRY: self.coordinator.data["server"]["country"],
ATTR_BYTES_RECEIVED: self._data["bytes_received"], ATTR_SERVER_ID: self.coordinator.data["server"]["id"],
ATTR_BYTES_SENT: self._data["bytes_sent"], }
ATTR_SERVER_COUNTRY: self._data["server"]["country"], if self.type == "download":
ATTR_SERVER_ID: self._data["server"]["id"], attributes[ATTR_BYTES_RECEIVED] = self.coordinator.data["bytes_received"]
ATTR_SERVER_LATENCY: self._data["server"]["latency"],
ATTR_SERVER_NAME: self._data["server"]["name"], if self.type == "upload":
} attributes[ATTR_BYTES_SENT] = self.coordinator.data["bytes_sent"]
)
return attributes return attributes
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Handle entity which will be added.""" """Handle entity which will be added."""
await super().async_added_to_hass()
state = await self.async_get_last_state()
if not state:
return
self._state = state.state
self.async_on_remove( self.async_on_remove(
async_dispatcher_connect( self.coordinator.async_add_listener(self.async_write_ha_state)
self.hass, DATA_UPDATED, self._schedule_immediate_update
)
) )
def update(self): async def async_update(self):
"""Get the latest data and update the states.""" """Request coordinator to update data."""
self._data = self.speedtest_client.data await self.coordinator.async_request_refresh()
if self._data is None:
return
if self.type == "ping":
self._state = self._data["ping"]
elif self.type == "download":
self._state = round(self._data["download"] / 10 ** 6, 2)
elif self.type == "upload":
self._state = round(self._data["upload"] / 10 ** 6, 2)
@callback
def _schedule_immediate_update(self):
self.async_schedule_update_ha_state(True)

View File

@ -0,0 +1,28 @@
{
"config": {
"step": {
"user": {
"title": "Set up SpeedTest",
"description": "Are you sure you want to set up SpeedTest?"
}
},
"abort": {
"one_instance_allowed": "Only a single instance is necessary.",
"wrong_server_id": "Server id is not valid"
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update frequency (minutes)",
"manual": "Disable auto update",
"server_name": "Select test server"
}
}
},
"error": {
"retrive_error": "Error retriving servers list"
}
}
}

View File

@ -0,0 +1,28 @@
{
"config": {
"step": {
"user": {
"title": "Set up SpeedTest",
"description": "Are you sure you want to set up SpeedTest?"
}
},
"abort": {
"one_instance_allowed": "Only a single instance is necessary.",
"wrong_server_id": "Server id is not valid"
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update frequency (minutes)",
"manual": "Disable auto update",
"server_name": "Select test server"
}
}
},
"error": {
"retrive_error": "Error retriving servers list"
}
}
}

View File

@ -141,6 +141,7 @@ FLOWS = [
"sonarr", "sonarr",
"songpal", "songpal",
"sonos", "sonos",
"speedtestdotnet",
"spotify", "spotify",
"starline", "starline",
"synology_dsm", "synology_dsm",

View File

@ -827,6 +827,9 @@ sonarr==0.2.2
# homeassistant.components.marytts # homeassistant.components.marytts
speak2mary==1.4.0 speak2mary==1.4.0
# homeassistant.components.speedtestdotnet
speedtest-cli==2.1.2
# homeassistant.components.spotify # homeassistant.components.spotify
spotipy==2.12.0 spotipy==2.12.0

View File

@ -0,0 +1,55 @@
"""Tests for SpeedTest."""
MOCK_SERVERS = {
1: [
{
"url": "http://server_1:8080/speedtest/upload.php",
"lat": "1",
"lon": "1",
"name": "Server1",
"country": "Country1",
"cc": "LL1",
"sponsor": "Server1",
"id": "1",
"host": "server1:8080",
"d": 1,
}
],
2: [
{
"url": "http://server_2:8080/speedtest/upload.php",
"lat": "2",
"lon": "2",
"name": "Server2",
"country": "Country2",
"cc": "LL2",
"sponsor": "server2",
"id": "2",
"host": "server2:8080",
"d": 2,
}
],
}
MOCK_RESULTS = {
"download": 1024000,
"upload": 1024000,
"ping": 18.465,
"server": {
"url": "http://test_server:8080/speedtest/upload.php",
"lat": "00.0000",
"lon": "11.1111",
"name": "NAME",
"country": "Country",
"id": "8408",
"host": "test_server:8080",
"d": 1.4858909757493415,
"latency": 18.465,
},
"timestamp": "2020-05-29T07:28:57.908387Z",
"bytes_sent": 4194304,
"bytes_received": 19712300,
"share": None,
}
MOCK_STATES = {"ping": "18.465", "download": "1.02", "upload": "1.02"}

View File

@ -0,0 +1,128 @@
"""Tests for SpeedTest config flow."""
from datetime import timedelta
import pytest
from speedtest import NoMatchedServers
from homeassistant import data_entry_flow
from homeassistant.components import speedtestdotnet
from homeassistant.components.speedtestdotnet.const import (
CONF_MANUAL,
CONF_SERVER_ID,
CONF_SERVER_NAME,
DOMAIN,
SENSOR_TYPES,
)
from homeassistant.const import CONF_MONITORED_CONDITIONS, CONF_SCAN_INTERVAL
from . import MOCK_SERVERS
from tests.async_mock import patch
from tests.common import MockConfigEntry
@pytest.fixture(name="mock_setup")
def mock_setup():
"""Mock entry setup."""
with patch(
"homeassistant.components.speedtestdotnet.async_setup_entry", return_value=True,
):
yield
async def test_flow_works(hass, mock_setup):
"""Test user config."""
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN, context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "SpeedTest"
async def test_import_fails(hass, mock_setup):
"""Test import step fails if server_id is not valid."""
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_servers.side_effect = NoMatchedServers
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": "import"},
data={
CONF_SERVER_ID: "223",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "wrong_server_id"
async def test_import_success(hass, mock_setup):
"""Test import step is successful if server_id is valid."""
with patch("speedtest.Speedtest"):
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN,
context={"source": "import"},
data={
CONF_SERVER_ID: "1",
CONF_MANUAL: True,
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_MONITORED_CONDITIONS: list(SENSOR_TYPES),
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "SpeedTest"
assert result["data"][CONF_SERVER_ID] == "1"
assert result["data"][CONF_MANUAL] is True
assert result["data"][CONF_SCAN_INTERVAL] == 1
async def test_options(hass):
"""Test updating options."""
entry = MockConfigEntry(domain=DOMAIN, title="SpeedTest", data={}, options={},)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_servers.return_value = MOCK_SERVERS
await hass.config_entries.async_setup(entry.entry_id)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SERVER_NAME: "Country1 - Server1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: False,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
CONF_SERVER_NAME: "Country1 - Server1",
CONF_SERVER_ID: "1",
CONF_SCAN_INTERVAL: 30,
CONF_MANUAL: False,
}
async def test_integration_already_configured(hass):
"""Test integration is already configured."""
entry = MockConfigEntry(domain=DOMAIN, data={}, options={},)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
speedtestdotnet.DOMAIN, context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "one_instance_allowed"

View File

@ -0,0 +1,66 @@
"""Tests for SpeedTest integration."""
import speedtest
from homeassistant import config_entries
from homeassistant.components import speedtestdotnet
from homeassistant.setup import async_setup_component
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_setup_with_config(hass):
"""Test that we import the config and setup the integration."""
config = {
speedtestdotnet.DOMAIN: {
speedtestdotnet.CONF_SERVER_ID: "1",
speedtestdotnet.CONF_MANUAL: True,
speedtestdotnet.CONF_SCAN_INTERVAL: "00:01:00",
}
}
with patch("speedtest.Speedtest"):
assert await async_setup_component(hass, speedtestdotnet.DOMAIN, config)
async def test_successful_config_entry(hass):
"""Test that SpeedTestDotNet is configured successfully."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={},)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest"), patch(
"homeassistant.config_entries.ConfigEntries.async_forward_entry_setup",
return_value=True,
) as forward_entry_setup:
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state == config_entries.ENTRY_STATE_LOADED
assert forward_entry_setup.mock_calls[0][1] == (entry, "sensor",)
async def test_setup_failed(hass):
"""Test SpeedTestDotNet failed due to an error."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={},)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest", side_effect=speedtest.ConfigRetrievalError):
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state == config_entries.ENTRY_STATE_SETUP_RETRY
async def test_unload_entry(hass):
"""Test removing SpeedTestDotNet."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={},)
entry.add_to_hass(hass)
with patch("speedtest.Speedtest"):
await hass.config_entries.async_setup(entry.entry_id)
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state == config_entries.ENTRY_STATE_NOT_LOADED
assert speedtestdotnet.DOMAIN not in hass.data

View File

@ -0,0 +1,30 @@
"""Tests for SpeedTest sensors."""
from homeassistant.components import speedtestdotnet
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.speedtestdotnet.const import DEFAULT_NAME, SENSOR_TYPES
from . import MOCK_RESULTS, MOCK_SERVERS, MOCK_STATES
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_speedtestdotnet_sensors(hass):
"""Test sensors created for speedtestdotnet integration."""
entry = MockConfigEntry(domain=speedtestdotnet.DOMAIN, data={})
entry.add_to_hass(hass)
with patch("speedtest.Speedtest") as mock_api:
mock_api.return_value.get_best_server.return_value = MOCK_SERVERS[1][0]
mock_api.return_value.results.dict.return_value = MOCK_RESULTS
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(SENSOR_DOMAIN)) == 3
for sensor_type in SENSOR_TYPES:
sensor = hass.states.get(
f"sensor.{DEFAULT_NAME}_{SENSOR_TYPES[sensor_type][0]}"
)
assert sensor.state == MOCK_STATES[sensor_type]