Use DataUpdateCoordinator for glances (#72748)

* use DataUpdateCoordinator for glances

add tests to increase coverage

fix test_config_flow.py

fix codecov/patch

remove unused const, minor tweaks

remove invalid_auth test as it is not implemented

fix type hints

* change to async_forward_entry_setups

* Use Dataupdatecoordinator for glances

* minor fixex

* minor fixes

* minor fix

* remove support_versions const

* coe cleanup

* address comments

* fix sensor native_value

* Rename entry to entry_data in `get_api`

* Remove whitespace in sensor name
This commit is contained in:
Rami Mosleh 2022-11-03 11:02:25 +02:00 committed by GitHub
parent 739ed6a6c8
commit 328eda044a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 279 additions and 339 deletions

View File

@ -455,7 +455,7 @@ omit =
homeassistant/components/github/sensor.py
homeassistant/components/gitlab_ci/sensor.py
homeassistant/components/gitter/sensor.py
homeassistant/components/glances/__init__.py
homeassistant/components/glances/const.py
homeassistant/components/glances/sensor.py
homeassistant/components/goalfeed/*
homeassistant/components/goodwe/__init__.py

View File

@ -1,27 +1,16 @@
"""The Glances component."""
from datetime import timedelta
import logging
from typing import Any
from glances_api import Glances, exceptions
from glances_api import Glances
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_SCAN_INTERVAL,
CONF_VERIFY_SSL,
Platform,
)
from homeassistant.const import CONF_NAME, CONF_VERIFY_SSL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.httpx_client import get_async_client
from .const import DATA_UPDATED, DEFAULT_SCAN_INTERVAL, DOMAIN
_LOGGER = logging.getLogger(__name__)
from .const import DOMAIN
from .coordinator import GlancesDataUpdateCoordinator
PLATFORMS = [Platform.SENSOR]
@ -30,106 +19,28 @@ CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Set up Glances from config entry."""
client = GlancesData(hass, config_entry)
hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = client
if not await client.async_setup():
return False
api = get_api(hass, dict(config_entry.data))
coordinator = GlancesDataUpdateCoordinator(hass, config_entry, api)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[config_entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
if not hass.data[DOMAIN]:
del hass.data[DOMAIN]
return unload_ok
class GlancesData:
"""Get the latest data from Glances api."""
def __init__(self, hass, config_entry):
"""Initialize the Glances data."""
self.hass = hass
self.config_entry = config_entry
self.api = None
self.unsub_timer = None
self.available = False
@property
def host(self):
"""Return client host."""
return self.config_entry.data[CONF_HOST]
async def async_update(self):
"""Get the latest data from the Glances REST API."""
try:
await self.api.get_data("all")
self.available = True
except exceptions.GlancesApiError:
_LOGGER.error("Unable to fetch data from Glances")
self.available = False
_LOGGER.debug("Glances data updated")
async_dispatcher_send(self.hass, DATA_UPDATED)
async def async_setup(self):
"""Set up the Glances client."""
try:
self.api = get_api(self.hass, self.config_entry.data)
await self.api.get_data("all")
self.available = True
_LOGGER.debug("Successfully connected to Glances")
except exceptions.GlancesApiConnectionError as err:
_LOGGER.debug("Can not connect to Glances")
raise ConfigEntryNotReady from err
self.add_options()
self.set_scan_interval(self.config_entry.options[CONF_SCAN_INTERVAL])
self.config_entry.async_on_unload(
self.config_entry.add_update_listener(self.async_options_updated)
)
await self.hass.config_entries.async_forward_entry_setups(
self.config_entry, PLATFORMS
)
return True
def add_options(self):
"""Add options for Glances integration."""
if not self.config_entry.options:
options = {CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL}
self.hass.config_entries.async_update_entry(
self.config_entry, options=options
)
def set_scan_interval(self, scan_interval):
"""Update scan interval."""
async def refresh(event_time):
"""Get the latest data from Glances api."""
await self.async_update()
if self.unsub_timer is not None:
self.unsub_timer()
self.unsub_timer = async_track_time_interval(
self.hass, refresh, timedelta(seconds=scan_interval)
)
@staticmethod
async def async_options_updated(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Triggered by config entry options updates."""
hass.data[DOMAIN][entry.entry_id].set_scan_interval(
entry.options[CONF_SCAN_INTERVAL]
)
def get_api(hass, entry):
def get_api(hass: HomeAssistant, entry_data: dict[str, Any]) -> Glances:
"""Return the api from glances_api."""
params = entry.copy()
params.pop(CONF_NAME, None)
verify_ssl = params.pop(CONF_VERIFY_SSL, True)
httpx_client = get_async_client(hass, verify_ssl=verify_ssl)
return Glances(httpx_client=httpx_client, **params)
entry_data.pop(CONF_NAME, None)
httpx_client = get_async_client(hass, verify_ssl=entry_data[CONF_VERIFY_SSL])
return Glances(httpx_client=httpx_client, **entry_data)

View File

@ -3,20 +3,19 @@ from __future__ import annotations
from typing import Any
import glances_api
from glances_api.exceptions import GlancesApiError
import voluptuous as vol
from homeassistant import config_entries, core, exceptions
from homeassistant import config_entries, exceptions
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import callback
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from . import get_api
@ -24,7 +23,6 @@ from .const import (
CONF_VERSION,
DEFAULT_HOST,
DEFAULT_PORT,
DEFAULT_SCAN_INTERVAL,
DEFAULT_VERSION,
DOMAIN,
SUPPORTED_VERSIONS,
@ -43,12 +41,12 @@ DATA_SCHEMA = vol.Schema(
)
async def validate_input(hass: core.HomeAssistant, data):
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> None:
"""Validate the user input allows us to connect."""
api = get_api(hass, data)
try:
api = get_api(hass, data)
await api.get_data("all")
except glances_api.exceptions.GlancesApiConnectionError as err:
except GlancesApiError as err:
raise CannotConnect from err
@ -57,14 +55,6 @@ class GlancesFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
@staticmethod
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> GlancesOptionsFlowHandler:
"""Get the options flow for this handler."""
return GlancesOptionsFlowHandler(config_entry)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
@ -85,31 +75,5 @@ class GlancesFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
)
class GlancesOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle Glances client options."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize Glances options flow."""
self.config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage the Glances options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
options = {
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
): int
}
return self.async_show_form(step_id="init", data_schema=vol.Schema(options))
class CannotConnect(exceptions.HomeAssistantError):
"""Error to indicate we cannot connect."""

View File

@ -10,7 +10,6 @@ DEFAULT_PORT = 61208
DEFAULT_VERSION = 3
DEFAULT_SCAN_INTERVAL = 60
DATA_UPDATED = "glances_data_updated"
SUPPORTED_VERSIONS = [2, 3]
CPU_ICON = f"mdi:cpu-{64 if sys.maxsize > 2**32 else 32}-bit"

View File

@ -0,0 +1,42 @@
"""Coordinator for Glances integration."""
from datetime import timedelta
import logging
from typing import Any
from glances_api import Glances, exceptions
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class GlancesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Get the latest data from Glances api."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: Glances) -> None:
"""Initialize the Glances data."""
self.hass = hass
self.config_entry = entry
self.host: str = entry.data[CONF_HOST]
self.api = api
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN} - {self.host}",
update_interval=timedelta(seconds=60),
)
async def _async_update_data(self) -> dict[str, Any]:
"""Get the latest data from the Glances REST API."""
try:
await self.api.get_data("all")
except exceptions.GlancesApiError as err:
raise UpdateFailed from err
return self.api.data

View File

@ -8,10 +8,10 @@ from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
SensorStateClass,
StateType,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
DATA_GIBIBYTES,
DATA_MEBIBYTES,
@ -21,22 +21,29 @@ from homeassistant.const import (
TEMP_CELSIUS,
Platform,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import GlancesData
from .const import CPU_ICON, DATA_UPDATED, DOMAIN
from . import GlancesDataUpdateCoordinator
from .const import CPU_ICON, DOMAIN
@dataclass
class GlancesSensorEntityDescription(SensorEntityDescription):
"""Describe Glances sensor entity."""
class GlancesSensorEntityDescriptionMixin:
"""Mixin for required keys."""
type: str | None = None
name_suffix: str | None = None
type: str
name_suffix: str
@dataclass
class GlancesSensorEntityDescription(
SensorEntityDescription, GlancesSensorEntityDescriptionMixin
):
"""Describe Glances sensor entity."""
SENSOR_TYPES: tuple[GlancesSensorEntityDescription, ...] = (
@ -234,9 +241,9 @@ async def async_setup_entry(
) -> None:
"""Set up the Glances sensors."""
client: GlancesData = hass.data[DOMAIN][config_entry.entry_id]
coordinator: GlancesDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
name = config_entry.data.get(CONF_NAME)
dev = []
entities = []
@callback
def _migrate_old_unique_ids(
@ -256,15 +263,15 @@ async def async_setup_entry(
for description in SENSOR_TYPES:
if description.type == "fs":
# fs will provide a list of disks attached
for disk in client.api.data[description.type]:
for disk in coordinator.data[description.type]:
_migrate_old_unique_ids(
hass,
f"{client.host}-{name} {disk['mnt_point']} {description.name_suffix}",
f"{coordinator.host}-{name} {disk['mnt_point']} {description.name_suffix}",
f"{disk['mnt_point']}-{description.key}",
)
dev.append(
entities.append(
GlancesSensor(
client,
coordinator,
name,
disk["mnt_point"],
description,
@ -272,101 +279,80 @@ async def async_setup_entry(
)
elif description.type == "sensors":
# sensors will provide temp for different devices
for sensor in client.api.data[description.type]:
for sensor in coordinator.data[description.type]:
if sensor["type"] == description.key:
_migrate_old_unique_ids(
hass,
f"{client.host}-{name} {sensor['label']} {description.name_suffix}",
f"{coordinator.host}-{name} {sensor['label']} {description.name_suffix}",
f"{sensor['label']}-{description.key}",
)
dev.append(
entities.append(
GlancesSensor(
client,
coordinator,
name,
sensor["label"],
description,
)
)
elif description.type == "raid":
for raid_device in client.api.data[description.type]:
for raid_device in coordinator.data[description.type]:
_migrate_old_unique_ids(
hass,
f"{client.host}-{name} {raid_device} {description.name_suffix}",
f"{coordinator.host}-{name} {raid_device} {description.name_suffix}",
f"{raid_device}-{description.key}",
)
dev.append(GlancesSensor(client, name, raid_device, description))
elif client.api.data[description.type]:
entities.append(
GlancesSensor(coordinator, name, raid_device, description)
)
elif coordinator.data[description.type]:
_migrate_old_unique_ids(
hass,
f"{client.host}-{name} {description.name_suffix}",
f"{coordinator.host}-{name} {description.name_suffix}",
f"-{description.key}",
)
dev.append(
entities.append(
GlancesSensor(
client,
coordinator,
name,
"",
description,
)
)
async_add_entities(dev, True)
async_add_entities(entities)
class GlancesSensor(SensorEntity):
class GlancesSensor(CoordinatorEntity[GlancesDataUpdateCoordinator], SensorEntity):
"""Implementation of a Glances sensor."""
entity_description: GlancesSensorEntityDescription
_attr_has_entity_name = True
_attr_should_poll = False
def __init__(
self,
glances_data: GlancesData,
coordinator: GlancesDataUpdateCoordinator,
name: str | None,
sensor_name_prefix: str,
description: GlancesSensorEntityDescription,
) -> None:
"""Initialize the sensor."""
self.glances_data = glances_data
super().__init__(coordinator)
self._sensor_name_prefix = sensor_name_prefix
self.unsub_update: CALLBACK_TYPE | None = None
self.entity_description = description
self._attr_name = f"{sensor_name_prefix} {description.name_suffix}"
self._attr_name = f"{sensor_name_prefix} {description.name_suffix}".strip()
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, glances_data.config_entry.entry_id)},
identifiers={(DOMAIN, coordinator.config_entry.entry_id)},
manufacturer="Glances",
name=name or glances_data.config_entry.data[CONF_HOST],
name=name or coordinator.host,
)
self._attr_unique_id = f"{self.glances_data.config_entry.entry_id}-{sensor_name_prefix}-{description.key}"
self._attr_unique_id = f"{coordinator.config_entry.entry_id}-{sensor_name_prefix}-{description.key}"
@property
def available(self) -> bool:
"""Could the device be accessed during the last update call."""
return self.glances_data.available
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
self.unsub_update = async_dispatcher_connect(
self.hass, DATA_UPDATED, self._schedule_immediate_update
)
@callback
def _schedule_immediate_update(self) -> None:
self.async_schedule_update_ha_state(True)
async def will_remove_from_hass(self) -> None:
"""Unsubscribe from update dispatcher."""
if self.unsub_update:
self.unsub_update()
self.unsub_update = None
async def async_update(self) -> None: # noqa: C901
"""Get the latest data from REST API."""
if (value := self.glances_data.api.data) is None:
return
def native_value(self) -> StateType: # noqa: C901
"""Return the state of the resources."""
if (value := self.coordinator.data) is None:
return None
state: StateType = None
if self.entity_description.type == "fs":
for var in value["fs"]:
if var["mnt_point"] == self._sensor_name_prefix:
@ -374,100 +360,102 @@ class GlancesSensor(SensorEntity):
break
if self.entity_description.key == "disk_free":
try:
self._attr_native_value = round(disk["free"] / 1024**3, 1)
state = round(disk["free"] / 1024**3, 1)
except KeyError:
self._attr_native_value = round(
state = round(
(disk["size"] - disk["used"]) / 1024**3,
1,
)
elif self.entity_description.key == "disk_use":
self._attr_native_value = round(disk["used"] / 1024**3, 1)
state = round(disk["used"] / 1024**3, 1)
elif self.entity_description.key == "disk_use_percent":
self._attr_native_value = disk["percent"]
state = disk["percent"]
elif self.entity_description.key == "battery":
for sensor in value["sensors"]:
if (
sensor["type"] == "battery"
and sensor["label"] == self._sensor_name_prefix
):
self._attr_native_value = sensor["value"]
state = sensor["value"]
elif self.entity_description.key == "fan_speed":
for sensor in value["sensors"]:
if (
sensor["type"] == "fan_speed"
and sensor["label"] == self._sensor_name_prefix
):
self._attr_native_value = sensor["value"]
state = sensor["value"]
elif self.entity_description.key == "temperature_core":
for sensor in value["sensors"]:
if (
sensor["type"] == "temperature_core"
and sensor["label"] == self._sensor_name_prefix
):
self._attr_native_value = sensor["value"]
state = sensor["value"]
elif self.entity_description.key == "temperature_hdd":
for sensor in value["sensors"]:
if (
sensor["type"] == "temperature_hdd"
and sensor["label"] == self._sensor_name_prefix
):
self._attr_native_value = sensor["value"]
state = sensor["value"]
elif self.entity_description.key == "memory_use_percent":
self._attr_native_value = value["mem"]["percent"]
state = value["mem"]["percent"]
elif self.entity_description.key == "memory_use":
self._attr_native_value = round(value["mem"]["used"] / 1024**2, 1)
state = round(value["mem"]["used"] / 1024**2, 1)
elif self.entity_description.key == "memory_free":
self._attr_native_value = round(value["mem"]["free"] / 1024**2, 1)
state = round(value["mem"]["free"] / 1024**2, 1)
elif self.entity_description.key == "swap_use_percent":
self._attr_native_value = value["memswap"]["percent"]
state = value["memswap"]["percent"]
elif self.entity_description.key == "swap_use":
self._attr_native_value = round(value["memswap"]["used"] / 1024**3, 1)
state = round(value["memswap"]["used"] / 1024**3, 1)
elif self.entity_description.key == "swap_free":
self._attr_native_value = round(value["memswap"]["free"] / 1024**3, 1)
state = round(value["memswap"]["free"] / 1024**3, 1)
elif self.entity_description.key == "processor_load":
# Windows systems don't provide load details
try:
self._attr_native_value = value["load"]["min15"]
state = value["load"]["min15"]
except KeyError:
self._attr_native_value = value["cpu"]["total"]
state = value["cpu"]["total"]
elif self.entity_description.key == "process_running":
self._attr_native_value = value["processcount"]["running"]
state = value["processcount"]["running"]
elif self.entity_description.key == "process_total":
self._attr_native_value = value["processcount"]["total"]
state = value["processcount"]["total"]
elif self.entity_description.key == "process_thread":
self._attr_native_value = value["processcount"]["thread"]
state = value["processcount"]["thread"]
elif self.entity_description.key == "process_sleeping":
self._attr_native_value = value["processcount"]["sleeping"]
state = value["processcount"]["sleeping"]
elif self.entity_description.key == "cpu_use_percent":
self._attr_native_value = value["quicklook"]["cpu"]
state = value["quicklook"]["cpu"]
elif self.entity_description.key == "docker_active":
count = 0
try:
for container in value["docker"]["containers"]:
if container["Status"] == "running" or "Up" in container["Status"]:
count += 1
self._attr_native_value = count
state = count
except KeyError:
self._attr_native_value = count
state = count
elif self.entity_description.key == "docker_cpu_use":
cpu_use = 0.0
try:
for container in value["docker"]["containers"]:
if container["Status"] == "running" or "Up" in container["Status"]:
cpu_use += container["cpu"]["total"]
self._attr_native_value = round(cpu_use, 1)
state = round(cpu_use, 1)
except KeyError:
self._attr_native_value = STATE_UNAVAILABLE
state = STATE_UNAVAILABLE
elif self.entity_description.key == "docker_memory_use":
mem_use = 0.0
try:
for container in value["docker"]["containers"]:
if container["Status"] == "running" or "Up" in container["Status"]:
mem_use += container["memory"]["usage"]
self._attr_native_value = round(mem_use / 1024**2, 1)
state = round(mem_use / 1024**2, 1)
except KeyError:
self._attr_native_value = STATE_UNAVAILABLE
state = STATE_UNAVAILABLE
elif self.entity_description.type == "raid":
for raid_device, raid in value["raid"].items():
if raid_device == self._sensor_name_prefix:
self._attr_native_value = raid[self.entity_description.key]
state = raid[self.entity_description.key]
return state

View File

@ -14,21 +14,10 @@
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"wrong_version": "Version not supported (2 or 3 only)"
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"options": {
"step": {
"init": {
"description": "Configure options for Glances",
"data": {
"scan_interval": "Update frequency"
}
}
}
}
}

View File

@ -4,8 +4,7 @@
"already_configured": "Device is already configured"
},
"error": {
"cannot_connect": "Failed to connect",
"wrong_version": "Version not supported (2 or 3 only)"
"cannot_connect": "Failed to connect"
},
"step": {
"user": {
@ -22,15 +21,5 @@
"title": "Setup Glances"
}
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update frequency"
},
"description": "Configure options for Glances"
}
}
}
}

View File

@ -1 +1,42 @@
"""Tests for Glances."""
MOCK_USER_INPUT = {
"host": "0.0.0.0",
"username": "username",
"password": "password",
"version": 3,
"port": 61208,
"ssl": False,
"verify_ssl": True,
}
MOCK_DATA = {
"cpu": {
"total": 10.6,
"user": 7.6,
"system": 2.1,
"idle": 88.8,
"nice": 0.0,
"iowait": 0.6,
},
"diskio": [
{
"time_since_update": 1,
"disk_name": "nvme0n1",
"read_count": 12,
"write_count": 466,
"read_bytes": 184320,
"write_bytes": 23863296,
"key": "disk_name",
},
],
"system": {
"os_name": "Linux",
"hostname": "fedora-35",
"platform": "64bit",
"linux_distro": "Fedora Linux 35",
"os_version": "5.15.6-200.fc35.x86_64",
"hr_name": "Fedora Linux 35 64bit",
},
"uptime": "3 days, 10:25:20",
}

View File

@ -0,0 +1,15 @@
"""Conftest for speedtestdotnet."""
from unittest.mock import AsyncMock, patch
import pytest
from . import MOCK_DATA
@pytest.fixture(autouse=True)
def mock_api():
"""Mock glances api."""
with patch("homeassistant.components.glances.Glances") as mock_api:
mock_api.return_value.get_data = AsyncMock(return_value=None)
mock_api.return_value.data.return_value = MOCK_DATA
yield mock_api

View File

@ -1,38 +1,22 @@
"""Tests for Glances config flow."""
from unittest.mock import patch
from unittest.mock import MagicMock
from glances_api import exceptions
from glances_api.exceptions import GlancesApiConnectionError
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant import config_entries
from homeassistant.components import glances
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
from . import MOCK_USER_INPUT
NAME = "Glances"
HOST = "0.0.0.0"
USERNAME = "username"
PASSWORD = "password"
PORT = 61208
VERSION = 3
SCAN_INTERVAL = 10
DEMO_USER_INPUT = {
"host": HOST,
"username": USERNAME,
"password": PASSWORD,
"version": VERSION,
"port": PORT,
"ssl": False,
"verify_ssl": True,
}
from tests.common import MockConfigEntry, patch
@pytest.fixture(autouse=True)
def glances_setup_fixture():
"""Mock transmission entry setup."""
"""Mock glances entry setup."""
with patch("homeassistant.components.glances.async_setup_entry", return_value=True):
yield
@ -43,74 +27,43 @@ async def test_form(hass: HomeAssistant) -> None:
result = await hass.config_entries.flow.async_init(
glances.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "user"
with patch("homeassistant.components.glances.Glances.get_data", autospec=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_INPUT
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
assert result["type"] == "create_entry"
assert result["title"] == HOST
assert result["data"] == DEMO_USER_INPUT
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["title"] == "0.0.0.0"
assert result["data"] == MOCK_USER_INPUT
async def test_form_cannot_connect(hass: HomeAssistant) -> None:
async def test_form_cannot_connect(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test to return error if we cannot connect."""
with patch(
"homeassistant.components.glances.Glances.get_data",
side_effect=exceptions.GlancesApiConnectionError,
):
result = await hass.config_entries.flow.async_init(
glances.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
)
mock_api.return_value.get_data.side_effect = GlancesApiConnectionError
result = await hass.config_entries.flow.async_init(
glances.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_INPUT
)
assert result["type"] == "form"
assert result["type"] == FlowResultType.FORM
assert result["errors"] == {"base": "cannot_connect"}
async def test_form_already_configured(hass: HomeAssistant) -> None:
"""Test host is already configured."""
entry = MockConfigEntry(
domain=glances.DOMAIN, data=DEMO_USER_INPUT, options={CONF_SCAN_INTERVAL: 60}
)
entry = MockConfigEntry(domain=glances.DOMAIN, data=MOCK_USER_INPUT)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
glances.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=DEMO_USER_INPUT
result["flow_id"], user_input=MOCK_USER_INPUT
)
assert result["type"] == "abort"
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_options(hass: HomeAssistant) -> None:
"""Test options for Glances."""
entry = MockConfigEntry(
domain=glances.DOMAIN, data=DEMO_USER_INPUT, options={CONF_SCAN_INTERVAL: 60}
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input={glances.CONF_SCAN_INTERVAL: 10}
)
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
assert result["data"] == {
glances.CONF_SCAN_INTERVAL: 10,
}

View File

@ -0,0 +1,49 @@
"""Tests for Glances integration."""
from unittest.mock import MagicMock
from glances_api.exceptions import GlancesApiConnectionError
from homeassistant.components.glances.const import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import MOCK_USER_INPUT
from tests.common import MockConfigEntry
async def test_successful_config_entry(hass: HomeAssistant) -> None:
"""Test that Glances is configured successfully."""
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_USER_INPUT)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state == ConfigEntryState.LOADED
async def test_conn_error(hass: HomeAssistant, mock_api: MagicMock) -> None:
"""Test Glances failed due to connection error."""
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_USER_INPUT)
entry.add_to_hass(hass)
mock_api.return_value.get_data.side_effect = GlancesApiConnectionError
await hass.config_entries.async_setup(entry.entry_id)
assert entry.state is ConfigEntryState.SETUP_RETRY
async def test_unload_entry(hass: HomeAssistant) -> None:
"""Test removing Glances."""
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_USER_INPUT)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is ConfigEntryState.NOT_LOADED
assert DOMAIN not in hass.data