mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 12:47:08 +00:00
Add Process binary sensor in System Monitor (#108585)
* Process binary sensor in System Monitor * Add repair flow * add issue * add platform * fix repair * Tests * Fix tests * add minor version * migrate * Mod repairs * Fix tests * config flow test * Last fixes * Review comments * Remove entities during repair * Remove snapshot
This commit is contained in:
parent
668d036f71
commit
1706156faf
@ -1,10 +1,16 @@
|
||||
"""The System Monitor integration."""
|
||||
|
||||
import logging
|
||||
|
||||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
PLATFORMS = [Platform.SENSOR]
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORMS = [Platform.BINARY_SENSOR, Platform.SENSOR]
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
@ -23,3 +29,24 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||
"""Handle options update."""
|
||||
await hass.config_entries.async_reload(entry.entry_id)
|
||||
|
||||
|
||||
async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Migrate old entry."""
|
||||
|
||||
if entry.version == 1:
|
||||
new_options = {**entry.options}
|
||||
if entry.minor_version == 1:
|
||||
# Migration copies process sensors to binary sensors
|
||||
# Repair will remove sensors when user submit the fix
|
||||
if processes := entry.options.get(SENSOR_DOMAIN):
|
||||
new_options[BINARY_SENSOR_DOMAIN] = processes
|
||||
entry.version = 1
|
||||
entry.minor_version = 2
|
||||
hass.config_entries.async_update_entry(entry, options=new_options)
|
||||
|
||||
_LOGGER.debug(
|
||||
"Migration to version %s.%s successful", entry.version, entry.minor_version
|
||||
)
|
||||
|
||||
return True
|
||||
|
147
homeassistant/components/systemmonitor/binary_sensor.py
Normal file
147
homeassistant/components/systemmonitor/binary_sensor.py
Normal file
@ -0,0 +1,147 @@
|
||||
"""Binary sensors for System Monitor."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
import logging
|
||||
import sys
|
||||
from typing import Generic, Literal
|
||||
|
||||
import psutil
|
||||
|
||||
from homeassistant.components.binary_sensor import (
|
||||
DOMAIN as BINARY_SENSOR_DOMAIN,
|
||||
BinarySensorDeviceClass,
|
||||
BinarySensorEntity,
|
||||
BinarySensorEntityDescription,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import EntityCategory
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||
from homeassistant.util import slugify
|
||||
|
||||
from .const import CONF_PROCESS, DOMAIN
|
||||
from .coordinator import MonitorCoordinator, SystemMonitorProcessCoordinator, dataT
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CONF_ARG = "arg"
|
||||
|
||||
|
||||
SENSOR_TYPE_NAME = 0
|
||||
SENSOR_TYPE_UOM = 1
|
||||
SENSOR_TYPE_ICON = 2
|
||||
SENSOR_TYPE_DEVICE_CLASS = 3
|
||||
SENSOR_TYPE_MANDATORY_ARG = 4
|
||||
|
||||
SIGNAL_SYSTEMMONITOR_UPDATE = "systemmonitor_update"
|
||||
|
||||
|
||||
@lru_cache
|
||||
def get_cpu_icon() -> Literal["mdi:cpu-64-bit", "mdi:cpu-32-bit"]:
|
||||
"""Return cpu icon."""
|
||||
if sys.maxsize > 2**32:
|
||||
return "mdi:cpu-64-bit"
|
||||
return "mdi:cpu-32-bit"
|
||||
|
||||
|
||||
def get_process(entity: SystemMonitorSensor[list[psutil.Process]]) -> bool:
|
||||
"""Return process."""
|
||||
state = False
|
||||
for proc in entity.coordinator.data:
|
||||
try:
|
||||
_LOGGER.debug("process %s for argument %s", proc.name(), entity.argument)
|
||||
if entity.argument == proc.name():
|
||||
state = True
|
||||
break
|
||||
except psutil.NoSuchProcess as err:
|
||||
_LOGGER.warning(
|
||||
"Failed to load process with ID: %s, old name: %s",
|
||||
err.pid,
|
||||
err.name,
|
||||
)
|
||||
return state
|
||||
|
||||
|
||||
@dataclass(frozen=True, kw_only=True)
|
||||
class SysMonitorBinarySensorEntityDescription(
|
||||
BinarySensorEntityDescription, Generic[dataT]
|
||||
):
|
||||
"""Describes System Monitor binary sensor entities."""
|
||||
|
||||
value_fn: Callable[[SystemMonitorSensor[dataT]], bool]
|
||||
|
||||
|
||||
SENSOR_TYPES: tuple[
|
||||
SysMonitorBinarySensorEntityDescription[list[psutil.Process]], ...
|
||||
] = (
|
||||
SysMonitorBinarySensorEntityDescription[list[psutil.Process]](
|
||||
key="binary_process",
|
||||
translation_key="process",
|
||||
icon=get_cpu_icon(),
|
||||
value_fn=get_process,
|
||||
device_class=BinarySensorDeviceClass.RUNNING,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Set up System Montor binary sensors based on a config entry."""
|
||||
entities: list[SystemMonitorSensor] = []
|
||||
process_coordinator = SystemMonitorProcessCoordinator(hass, "Process coordinator")
|
||||
await process_coordinator.async_request_refresh()
|
||||
|
||||
for sensor_description in SENSOR_TYPES:
|
||||
_entry = entry.options.get(BINARY_SENSOR_DOMAIN, {})
|
||||
for argument in _entry.get(CONF_PROCESS, []):
|
||||
entities.append(
|
||||
SystemMonitorSensor(
|
||||
process_coordinator,
|
||||
sensor_description,
|
||||
entry.entry_id,
|
||||
argument,
|
||||
)
|
||||
)
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class SystemMonitorSensor(
|
||||
CoordinatorEntity[MonitorCoordinator[dataT]], BinarySensorEntity
|
||||
):
|
||||
"""Implementation of a system monitor binary sensor."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_entity_category = EntityCategory.DIAGNOSTIC
|
||||
entity_description: SysMonitorBinarySensorEntityDescription[dataT]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: MonitorCoordinator[dataT],
|
||||
sensor_description: SysMonitorBinarySensorEntityDescription[dataT],
|
||||
entry_id: str,
|
||||
argument: str,
|
||||
) -> None:
|
||||
"""Initialize the binary sensor."""
|
||||
super().__init__(coordinator)
|
||||
self.entity_description = sensor_description
|
||||
self._attr_translation_placeholders = {"process": argument}
|
||||
self._attr_unique_id: str = slugify(f"{sensor_description.key}_{argument}")
|
||||
self._attr_device_info = DeviceInfo(
|
||||
entry_type=DeviceEntryType.SERVICE,
|
||||
identifiers={(DOMAIN, entry_id)},
|
||||
manufacturer="System Monitor",
|
||||
name="System Monitor",
|
||||
)
|
||||
self.argument = argument
|
||||
|
||||
@property
|
||||
def is_on(self) -> bool | None:
|
||||
"""Return true if the binary sensor is on."""
|
||||
return self.entity_description.value_fn(self)
|
@ -6,8 +6,8 @@ from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
|
||||
from homeassistant.components.homeassistant import DOMAIN as HOMEASSISTANT_DOMAIN
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
@ -34,7 +34,7 @@ async def validate_sensor_setup(
|
||||
"""Validate sensor input."""
|
||||
# Standard behavior is to merge the result with the options.
|
||||
# In this case, we want to add a sub-item so we update the options directly.
|
||||
sensors: dict[str, list] = handler.options.setdefault(SENSOR_DOMAIN, {})
|
||||
sensors: dict[str, list] = handler.options.setdefault(BINARY_SENSOR_DOMAIN, {})
|
||||
processes = sensors.setdefault(CONF_PROCESS, [])
|
||||
previous_processes = processes.copy()
|
||||
processes.clear()
|
||||
@ -44,7 +44,7 @@ async def validate_sensor_setup(
|
||||
for process in previous_processes:
|
||||
if process not in processes and (
|
||||
entity_id := entity_registry.async_get_entity_id(
|
||||
SENSOR_DOMAIN, DOMAIN, slugify(f"process_{process}")
|
||||
BINARY_SENSOR_DOMAIN, DOMAIN, slugify(f"binary_process_{process}")
|
||||
)
|
||||
):
|
||||
entity_registry.async_remove(entity_id)
|
||||
@ -58,7 +58,7 @@ async def validate_import_sensor_setup(
|
||||
"""Validate sensor input."""
|
||||
# Standard behavior is to merge the result with the options.
|
||||
# In this case, we want to add a sub-item so we update the options directly.
|
||||
sensors: dict[str, list] = handler.options.setdefault(SENSOR_DOMAIN, {})
|
||||
sensors: dict[str, list] = handler.options.setdefault(BINARY_SENSOR_DOMAIN, {})
|
||||
import_processes: list[str] = user_input["processes"]
|
||||
processes = sensors.setdefault(CONF_PROCESS, [])
|
||||
processes.extend(import_processes)
|
||||
@ -104,7 +104,7 @@ async def get_sensor_setup_schema(handler: SchemaCommonFlowHandler) -> vol.Schem
|
||||
|
||||
async def get_suggested_value(handler: SchemaCommonFlowHandler) -> dict[str, Any]:
|
||||
"""Return suggested values for sensor setup."""
|
||||
sensors: dict[str, list] = handler.options.get(SENSOR_DOMAIN, {})
|
||||
sensors: dict[str, list] = handler.options.get(BINARY_SENSOR_DOMAIN, {})
|
||||
processes: list[str] = sensors.get(CONF_PROCESS, [])
|
||||
return {CONF_PROCESS: processes}
|
||||
|
||||
@ -130,6 +130,8 @@ class SystemMonitorConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
|
||||
|
||||
config_flow = CONFIG_FLOW
|
||||
options_flow = OPTIONS_FLOW
|
||||
VERSION = 1
|
||||
MINOR_VERSION = 2
|
||||
|
||||
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
|
||||
"""Return config entry title."""
|
||||
|
72
homeassistant/components/systemmonitor/repairs.py
Normal file
72
homeassistant/components/systemmonitor/repairs.py
Normal file
@ -0,0 +1,72 @@
|
||||
"""Repairs platform for the System Monitor integration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, cast
|
||||
|
||||
from homeassistant import data_entry_flow
|
||||
from homeassistant.components.repairs import ConfirmRepairFlow, RepairsFlow
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
|
||||
|
||||
class ProcessFixFlow(RepairsFlow):
|
||||
"""Handler for an issue fixing flow."""
|
||||
|
||||
def __init__(self, entry: ConfigEntry, processes: list[str]) -> None:
|
||||
"""Create flow."""
|
||||
super().__init__()
|
||||
self.entry = entry
|
||||
self._processes = processes
|
||||
|
||||
async def async_step_init(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> data_entry_flow.FlowResult:
|
||||
"""Handle the first step of a fix flow."""
|
||||
return await self.async_step_migrate_process_sensor()
|
||||
|
||||
async def async_step_migrate_process_sensor(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> data_entry_flow.FlowResult:
|
||||
"""Handle the options step of a fix flow."""
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="migrate_process_sensor",
|
||||
description_placeholders={"processes": ", ".join(self._processes)},
|
||||
)
|
||||
|
||||
# Migration has copied the sensors to binary sensors
|
||||
# Pop the sensors to repair and remove entities
|
||||
new_options: dict[str, Any] = self.entry.options.copy()
|
||||
new_options.pop(SENSOR_DOMAIN)
|
||||
|
||||
entity_reg = er.async_get(self.hass)
|
||||
entries = er.async_entries_for_config_entry(entity_reg, self.entry.entry_id)
|
||||
for entry in entries:
|
||||
if entry.entity_id.startswith("sensor.") and entry.unique_id.startswith(
|
||||
"process_"
|
||||
):
|
||||
entity_reg.async_remove(entry.entity_id)
|
||||
|
||||
self.hass.config_entries.async_update_entry(self.entry, options=new_options)
|
||||
await self.hass.config_entries.async_reload(self.entry.entry_id)
|
||||
return self.async_create_entry(data={})
|
||||
|
||||
|
||||
async def async_create_fix_flow(
|
||||
hass: HomeAssistant,
|
||||
issue_id: str,
|
||||
data: dict[str, Any] | None,
|
||||
) -> RepairsFlow:
|
||||
"""Create flow."""
|
||||
entry = None
|
||||
if data and (entry_id := data.get("entry_id")):
|
||||
entry_id = cast(str, entry_id)
|
||||
processes: list[str] = data["processes"]
|
||||
entry = hass.config_entries.async_get_entry(entry_id)
|
||||
assert entry
|
||||
return ProcessFixFlow(entry, processes)
|
||||
|
||||
return ConfirmRepairFlow()
|
@ -1,4 +1,5 @@
|
||||
"""Support for monitoring the local system."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
@ -39,6 +40,7 @@ from homeassistant.core import HomeAssistant
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
|
||||
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||
from homeassistant.util import slugify
|
||||
@ -638,6 +640,20 @@ async def async_setup_entry( # noqa: C901
|
||||
True,
|
||||
)
|
||||
)
|
||||
async_create_issue(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"process_sensor",
|
||||
breaks_in_ha_version="2024.9.0",
|
||||
is_fixable=True,
|
||||
is_persistent=False,
|
||||
severity=IssueSeverity.WARNING,
|
||||
translation_key="process_sensor",
|
||||
data={
|
||||
"entry_id": entry.entry_id,
|
||||
"processes": _entry[CONF_PROCESS],
|
||||
},
|
||||
)
|
||||
continue
|
||||
|
||||
if _type == "processor_use":
|
||||
|
@ -22,7 +22,25 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"issues": {
|
||||
"process_sensor": {
|
||||
"title": "Process sensors are deprecated and will be removed",
|
||||
"fix_flow": {
|
||||
"step": {
|
||||
"migrate_process_sensor": {
|
||||
"title": "Process sensors have been setup as binary sensors",
|
||||
"description": "Process sensors `{processes}` have been created as binary sensors and the sensors will be removed in 2024.9.0.\n\nPlease update all automations, scripts, dashboards or other things depending on these sensors to use the newly created binary sensors instead and press **Submit** to fix this issue."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"entity": {
|
||||
"binary_sensor": {
|
||||
"process": {
|
||||
"name": "Process {process}"
|
||||
}
|
||||
},
|
||||
"sensor": {
|
||||
"disk_free": {
|
||||
"name": "Disk free {mount_point}"
|
||||
|
@ -75,7 +75,7 @@ def mock_config_entry() -> MockConfigEntry:
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
|
@ -0,0 +1,21 @@
|
||||
# serializer version: 1
|
||||
# name: test_binary_sensor[System Monitor Process pip - attributes]
|
||||
ReadOnlyDict({
|
||||
'device_class': 'running',
|
||||
'friendly_name': 'System Monitor Process pip',
|
||||
'icon': 'mdi:cpu-64-bit',
|
||||
})
|
||||
# ---
|
||||
# name: test_binary_sensor[System Monitor Process pip - state]
|
||||
'on'
|
||||
# ---
|
||||
# name: test_binary_sensor[System Monitor Process python3 - attributes]
|
||||
ReadOnlyDict({
|
||||
'device_class': 'running',
|
||||
'friendly_name': 'System Monitor Process python3',
|
||||
'icon': 'mdi:cpu-64-bit',
|
||||
})
|
||||
# ---
|
||||
# name: test_binary_sensor[System Monitor Process python3 - state]
|
||||
'on'
|
||||
# ---
|
73
tests/components/systemmonitor/snapshots/test_repairs.ambr
Normal file
73
tests/components/systemmonitor/snapshots/test_repairs.ambr
Normal file
@ -0,0 +1,73 @@
|
||||
# serializer version: 1
|
||||
# name: test_migrate_process_sensor[after_migration]
|
||||
list([
|
||||
ConfigEntrySnapshot({
|
||||
'data': dict({
|
||||
}),
|
||||
'disabled_by': None,
|
||||
'domain': 'systemmonitor',
|
||||
'entry_id': <ANY>,
|
||||
'minor_version': 2,
|
||||
'options': dict({
|
||||
'binary_sensor': dict({
|
||||
'process': list([
|
||||
'python3',
|
||||
'pip',
|
||||
]),
|
||||
}),
|
||||
'resources': list([
|
||||
'disk_use_percent_/',
|
||||
'disk_use_percent_/home/notexist/',
|
||||
'memory_free_',
|
||||
'network_out_eth0',
|
||||
'process_python3',
|
||||
]),
|
||||
}),
|
||||
'pref_disable_new_entities': False,
|
||||
'pref_disable_polling': False,
|
||||
'source': 'user',
|
||||
'title': 'System Monitor',
|
||||
'unique_id': None,
|
||||
'version': 1,
|
||||
}),
|
||||
])
|
||||
# ---
|
||||
# name: test_migrate_process_sensor[before_migration]
|
||||
list([
|
||||
ConfigEntrySnapshot({
|
||||
'data': dict({
|
||||
}),
|
||||
'disabled_by': None,
|
||||
'domain': 'systemmonitor',
|
||||
'entry_id': <ANY>,
|
||||
'minor_version': 2,
|
||||
'options': dict({
|
||||
'binary_sensor': dict({
|
||||
'process': list([
|
||||
'python3',
|
||||
'pip',
|
||||
]),
|
||||
}),
|
||||
'resources': list([
|
||||
'disk_use_percent_/',
|
||||
'disk_use_percent_/home/notexist/',
|
||||
'memory_free_',
|
||||
'network_out_eth0',
|
||||
'process_python3',
|
||||
]),
|
||||
'sensor': dict({
|
||||
'process': list([
|
||||
'python3',
|
||||
'pip',
|
||||
]),
|
||||
}),
|
||||
}),
|
||||
'pref_disable_new_entities': False,
|
||||
'pref_disable_polling': False,
|
||||
'source': 'user',
|
||||
'title': 'System Monitor',
|
||||
'unique_id': None,
|
||||
'version': 1,
|
||||
}),
|
||||
])
|
||||
# ---
|
107
tests/components/systemmonitor/test_binary_sensor.py
Normal file
107
tests/components/systemmonitor/test_binary_sensor.py
Normal file
@ -0,0 +1,107 @@
|
||||
"""Test System Monitor binary sensor."""
|
||||
from datetime import timedelta
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
|
||||
from homeassistant.components.systemmonitor.binary_sensor import get_cpu_icon
|
||||
from homeassistant.components.systemmonitor.const import DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import STATE_OFF, STATE_ON
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
|
||||
from .conftest import MockProcess
|
||||
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
|
||||
async def test_binary_sensor(
|
||||
hass: HomeAssistant,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
entity_registry: er.EntityRegistry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test the binary sensor."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
process_binary_sensor = hass.states.get(
|
||||
"binary_sensor.system_monitor_process_python3"
|
||||
)
|
||||
assert process_binary_sensor is not None
|
||||
|
||||
for entity in er.async_entries_for_config_entry(
|
||||
entity_registry, mock_config_entry.entry_id
|
||||
):
|
||||
if entity.domain == BINARY_SENSOR_DOMAIN:
|
||||
state = hass.states.get(entity.entity_id)
|
||||
assert state.state == snapshot(name=f"{state.name} - state")
|
||||
assert state.attributes == snapshot(name=f"{state.name} - attributes")
|
||||
|
||||
|
||||
async def test_binary_sensor_icon(
|
||||
hass: HomeAssistant,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_util: Mock,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test the sensor icon for 32bit/64bit system."""
|
||||
|
||||
get_cpu_icon.cache_clear()
|
||||
with patch("sys.maxsize", 2**32):
|
||||
assert get_cpu_icon() == "mdi:cpu-32-bit"
|
||||
get_cpu_icon.cache_clear()
|
||||
with patch("sys.maxsize", 2**64):
|
||||
assert get_cpu_icon() == "mdi:cpu-64-bit"
|
||||
|
||||
|
||||
async def test_sensor_process_fails(
|
||||
hass: HomeAssistant,
|
||||
mock_added_config_entry: ConfigEntry,
|
||||
mock_psutil: Mock,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test process not exist failure."""
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_ON
|
||||
|
||||
_process = MockProcess("python3", True)
|
||||
|
||||
mock_psutil.process_iter.return_value = [_process]
|
||||
|
||||
freezer.tick(timedelta(minutes=1))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_OFF
|
||||
|
||||
assert "Failed to load process with ID: 1, old name: python3" in caplog.text
|
@ -6,11 +6,13 @@ from unittest.mock import AsyncMock
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.homeassistant import DOMAIN as HOMEASSISTANT_DOMAIN
|
||||
from homeassistant.components.systemmonitor.const import CONF_PROCESS, DOMAIN
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
from homeassistant.helpers import entity_registry as er, issue_registry as ir
|
||||
from homeassistant.util import slugify
|
||||
from homeassistant.helpers import (
|
||||
device_registry as dr,
|
||||
entity_registry as er,
|
||||
issue_registry as ir,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
@ -59,7 +61,7 @@ async def test_import(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["options"] == {
|
||||
"sensor": {"process": ["systemd", "octave-cli"]},
|
||||
"binary_sensor": {"process": ["systemd", "octave-cli"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"memory_free_",
|
||||
@ -116,7 +118,7 @@ async def test_import_already_configured(
|
||||
domain=DOMAIN,
|
||||
source=config_entries.SOURCE_USER,
|
||||
options={
|
||||
"sensor": [{CONF_PROCESS: "systemd"}],
|
||||
"binary_sensor": [{CONF_PROCESS: "systemd"}],
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"memory_free_",
|
||||
@ -158,16 +160,21 @@ async def test_import_already_configured(
|
||||
|
||||
|
||||
async def test_add_and_remove_processes(
|
||||
hass: HomeAssistant, mock_setup_entry: AsyncMock
|
||||
hass: HomeAssistant,
|
||||
device_registry: dr.DeviceRegistry,
|
||||
entity_registry: er.EntityRegistry,
|
||||
) -> None:
|
||||
"""Test adding and removing process sensors."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
source=config_entries.SOURCE_USER,
|
||||
data={},
|
||||
options={},
|
||||
entry_id="1",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
result = await hass.config_entries.options.async_init(config_entry.entry_id)
|
||||
|
||||
@ -184,7 +191,7 @@ async def test_add_and_remove_processes(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["data"] == {
|
||||
"sensor": {
|
||||
"binary_sensor": {
|
||||
CONF_PROCESS: ["systemd"],
|
||||
}
|
||||
}
|
||||
@ -205,26 +212,19 @@ async def test_add_and_remove_processes(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["data"] == {
|
||||
"sensor": {
|
||||
"binary_sensor": {
|
||||
CONF_PROCESS: ["systemd", "octave-cli"],
|
||||
},
|
||||
}
|
||||
|
||||
entity_reg = er.async_get(hass)
|
||||
entity_reg.async_get_or_create(
|
||||
domain=Platform.SENSOR,
|
||||
platform=DOMAIN,
|
||||
unique_id=slugify("process_systemd"),
|
||||
config_entry=config_entry,
|
||||
assert (
|
||||
entity_registry.async_get("binary_sensor.system_monitor_process_systemd")
|
||||
is not None
|
||||
)
|
||||
entity_reg.async_get_or_create(
|
||||
domain=Platform.SENSOR,
|
||||
platform=DOMAIN,
|
||||
unique_id=slugify("process_octave-cli"),
|
||||
config_entry=config_entry,
|
||||
assert (
|
||||
entity_registry.async_get("binary_sensor.system_monitor_process_octave_cli")
|
||||
is not None
|
||||
)
|
||||
assert entity_reg.async_get("sensor.systemmonitor_process_systemd") is not None
|
||||
assert entity_reg.async_get("sensor.systemmonitor_process_octave_cli") is not None
|
||||
|
||||
# Remove one
|
||||
result = await hass.config_entries.options.async_init(config_entry.entry_id)
|
||||
@ -242,7 +242,7 @@ async def test_add_and_remove_processes(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["data"] == {
|
||||
"sensor": {
|
||||
"binary_sensor": {
|
||||
CONF_PROCESS: ["systemd"],
|
||||
},
|
||||
}
|
||||
@ -263,8 +263,13 @@ async def test_add_and_remove_processes(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["data"] == {
|
||||
"sensor": {CONF_PROCESS: []},
|
||||
"binary_sensor": {CONF_PROCESS: []},
|
||||
}
|
||||
|
||||
assert entity_reg.async_get("sensor.systemmonitor_process_systemd") is None
|
||||
assert entity_reg.async_get("sensor.systemmonitor_process_octave_cli") is None
|
||||
assert (
|
||||
entity_registry.async_get("binary_sensor.systemmonitor_process_systemd") is None
|
||||
)
|
||||
assert (
|
||||
entity_registry.async_get("binary_sensor.systemmonitor_process_octave_cli")
|
||||
is None
|
||||
)
|
||||
|
@ -1,12 +1,19 @@
|
||||
"""Test for System Monitor init."""
|
||||
from __future__ import annotations
|
||||
|
||||
from homeassistant.components.systemmonitor.const import CONF_PROCESS
|
||||
from unittest.mock import Mock
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.systemmonitor.const import CONF_PROCESS, DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
|
||||
from homeassistant.const import STATE_OFF
|
||||
from homeassistant.const import STATE_OFF, STATE_ON
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
async def test_load_unload_entry(
|
||||
hass: HomeAssistant, mock_added_config_entry: ConfigEntry
|
||||
@ -23,7 +30,7 @@ async def test_adding_processor_to_options(
|
||||
hass: HomeAssistant, mock_added_config_entry: ConfigEntry
|
||||
) -> None:
|
||||
"""Test options listener."""
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_systemd")
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_systemd")
|
||||
assert process_sensor is None
|
||||
|
||||
result = await hass.config_entries.options.async_init(
|
||||
@ -43,7 +50,7 @@ async def test_adding_processor_to_options(
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert result["data"] == {
|
||||
"sensor": {
|
||||
"binary_sensor": {
|
||||
CONF_PROCESS: ["python3", "pip", "systemd"],
|
||||
},
|
||||
"resources": [
|
||||
@ -55,6 +62,42 @@ async def test_adding_processor_to_options(
|
||||
],
|
||||
}
|
||||
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_systemd")
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_systemd")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_OFF
|
||||
|
||||
|
||||
async def test_migrate_process_sensor_to_binary_sensors(
|
||||
hass: HomeAssistant,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test process not exist failure."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_ON
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_ON
|
||||
|
197
tests/components/systemmonitor/test_repairs.py
Normal file
197
tests/components/systemmonitor/test_repairs.py
Normal file
@ -0,0 +1,197 @@
|
||||
"""Test repairs for System Monitor."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import Mock
|
||||
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
from homeassistant.components.repairs.websocket_api import (
|
||||
RepairsFlowIndexView,
|
||||
RepairsFlowResourceView,
|
||||
)
|
||||
from homeassistant.components.systemmonitor.const import DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
from homeassistant.helpers import entity_registry as er, issue_registry as ir
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import ANY, MockConfigEntry
|
||||
from tests.typing import ClientSessionGenerator, WebSocketGenerator
|
||||
|
||||
|
||||
async def test_migrate_process_sensor(
|
||||
hass: HomeAssistant,
|
||||
entity_registry: er.EntityRegistry,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
hass_client: ClientSessionGenerator,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test migrating process sensor to binary sensor."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.config_entries.async_entries(DOMAIN) == snapshot(
|
||||
name="before_migration"
|
||||
)
|
||||
|
||||
assert await async_setup_component(hass, "repairs", {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
entity = "sensor.system_monitor_process_python3"
|
||||
state = hass.states.get(entity)
|
||||
assert state
|
||||
|
||||
assert entity_registry.async_get(entity)
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
client = await hass_client()
|
||||
|
||||
await ws_client.send_json({"id": 1, "type": "repairs/list_issues"})
|
||||
msg = await ws_client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
assert len(msg["result"]["issues"]) > 0
|
||||
issue = None
|
||||
for i in msg["result"]["issues"]:
|
||||
if i["issue_id"] == "process_sensor":
|
||||
issue = i
|
||||
assert issue is not None
|
||||
|
||||
url = RepairsFlowIndexView.url
|
||||
resp = await client.post(
|
||||
url, json={"handler": DOMAIN, "issue_id": "process_sensor"}
|
||||
)
|
||||
assert resp.status == HTTPStatus.OK
|
||||
data = await resp.json()
|
||||
|
||||
flow_id = data["flow_id"]
|
||||
assert data["step_id"] == "migrate_process_sensor"
|
||||
|
||||
url = RepairsFlowResourceView.url.format(flow_id=flow_id)
|
||||
resp = await client.post(url, json={})
|
||||
assert resp.status == HTTPStatus.OK
|
||||
data = await resp.json()
|
||||
|
||||
assert data["type"] == FlowResultType.CREATE_ENTRY
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("binary_sensor.system_monitor_process_python3")
|
||||
assert state
|
||||
|
||||
await ws_client.send_json({"id": 2, "type": "repairs/list_issues"})
|
||||
msg = await ws_client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
issue = None
|
||||
for i in msg["result"]["issues"]:
|
||||
if i["issue_id"] == "migrate_process_sensor":
|
||||
issue = i
|
||||
assert not issue
|
||||
|
||||
entity = "sensor.system_monitor_process_python3"
|
||||
state = hass.states.get(entity)
|
||||
assert not state
|
||||
|
||||
assert not entity_registry.async_get(entity)
|
||||
|
||||
assert hass.config_entries.async_entries(DOMAIN) == snapshot(name="after_migration")
|
||||
|
||||
|
||||
async def test_other_fixable_issues(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_added_config_entry: ConfigEntry,
|
||||
) -> None:
|
||||
"""Test fixing other issues."""
|
||||
assert await async_setup_component(hass, "repairs", {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
client = await hass_client()
|
||||
|
||||
await ws_client.send_json({"id": 1, "type": "repairs/list_issues"})
|
||||
msg = await ws_client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
|
||||
issue = {
|
||||
"breaks_in_ha_version": "2022.9.0dev0",
|
||||
"domain": DOMAIN,
|
||||
"issue_id": "issue_1",
|
||||
"is_fixable": True,
|
||||
"learn_more_url": "",
|
||||
"severity": "error",
|
||||
"translation_key": "issue_1",
|
||||
}
|
||||
ir.async_create_issue(
|
||||
hass,
|
||||
issue["domain"],
|
||||
issue["issue_id"],
|
||||
breaks_in_ha_version=issue["breaks_in_ha_version"],
|
||||
is_fixable=issue["is_fixable"],
|
||||
is_persistent=False,
|
||||
learn_more_url=None,
|
||||
severity=issue["severity"],
|
||||
translation_key=issue["translation_key"],
|
||||
)
|
||||
|
||||
await ws_client.send_json({"id": 2, "type": "repairs/list_issues"})
|
||||
msg = await ws_client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
results = msg["result"]["issues"]
|
||||
assert {
|
||||
"breaks_in_ha_version": "2022.9.0dev0",
|
||||
"created": ANY,
|
||||
"dismissed_version": None,
|
||||
"domain": DOMAIN,
|
||||
"is_fixable": True,
|
||||
"issue_domain": None,
|
||||
"issue_id": "issue_1",
|
||||
"learn_more_url": None,
|
||||
"severity": "error",
|
||||
"translation_key": "issue_1",
|
||||
"translation_placeholders": None,
|
||||
"ignored": False,
|
||||
} in results
|
||||
|
||||
url = RepairsFlowIndexView.url
|
||||
resp = await client.post(url, json={"handler": DOMAIN, "issue_id": "issue_1"})
|
||||
assert resp.status == HTTPStatus.OK
|
||||
data = await resp.json()
|
||||
|
||||
flow_id = data["flow_id"]
|
||||
assert data["step_id"] == "confirm"
|
||||
|
||||
url = RepairsFlowResourceView.url.format(flow_id=flow_id)
|
||||
resp = await client.post(url)
|
||||
assert resp.status == HTTPStatus.OK
|
||||
data = await resp.json()
|
||||
|
||||
assert data["type"] == FlowResultType.CREATE_ENTRY
|
||||
await hass.async_block_till_done()
|
@ -8,6 +8,8 @@ from psutil._common import sdiskusage, shwtemp, snetio, snicaddr
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.components.systemmonitor.const import DOMAIN
|
||||
from homeassistant.components.systemmonitor.sensor import get_cpu_icon
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, STATE_UNKNOWN
|
||||
@ -23,11 +25,33 @@ from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
async def test_sensor(
|
||||
hass: HomeAssistant,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_added_config_entry: ConfigEntry,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
entity_registry: er.EntityRegistry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test the sensor."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
memory_sensor = hass.states.get("sensor.system_monitor_memory_free")
|
||||
assert memory_sensor is not None
|
||||
assert memory_sensor.state == "40.0"
|
||||
@ -44,11 +68,45 @@ async def test_sensor(
|
||||
assert process_sensor.state == STATE_ON
|
||||
|
||||
for entity in er.async_entries_for_config_entry(
|
||||
entity_registry, mock_added_config_entry.entry_id
|
||||
entity_registry, mock_config_entry.entry_id
|
||||
):
|
||||
state = hass.states.get(entity.entity_id)
|
||||
assert state.state == snapshot(name=f"{state.name} - state")
|
||||
assert state.attributes == snapshot(name=f"{state.name} - attributes")
|
||||
if entity.domain == SENSOR_DOMAIN:
|
||||
state = hass.states.get(entity.entity_id)
|
||||
assert state.state == snapshot(name=f"{state.name} - state")
|
||||
assert state.attributes == snapshot(name=f"{state.name} - attributes")
|
||||
|
||||
|
||||
async def test_process_sensor_not_loaded(
|
||||
hass: HomeAssistant,
|
||||
entity_registry_enabled_by_default: None,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
entity_registry: er.EntityRegistry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test the process sensor is not loaded once migrated."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_python3")
|
||||
assert process_sensor is None
|
||||
|
||||
|
||||
async def test_sensor_not_loading_veth_networks(
|
||||
@ -112,7 +170,7 @@ async def test_sensor_yaml(
|
||||
assert memory_sensor is not None
|
||||
assert memory_sensor.state == "40.0"
|
||||
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_python3")
|
||||
process_sensor = hass.states.get("binary_sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_ON
|
||||
|
||||
@ -142,11 +200,32 @@ async def test_sensor_yaml_fails_missing_argument(
|
||||
|
||||
async def test_sensor_updating(
|
||||
hass: HomeAssistant,
|
||||
mock_added_config_entry: ConfigEntry,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
) -> None:
|
||||
"""Test the sensor."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
memory_sensor = hass.states.get("sensor.system_monitor_memory_free")
|
||||
assert memory_sensor is not None
|
||||
assert memory_sensor.state == "40.0"
|
||||
@ -189,12 +268,33 @@ async def test_sensor_updating(
|
||||
|
||||
async def test_sensor_process_fails(
|
||||
hass: HomeAssistant,
|
||||
mock_added_config_entry: ConfigEntry,
|
||||
mock_psutil: Mock,
|
||||
mock_os: Mock,
|
||||
mock_util: Mock,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test process not exist failure."""
|
||||
mock_config_entry = MockConfigEntry(
|
||||
title="System Monitor",
|
||||
domain=DOMAIN,
|
||||
data={},
|
||||
options={
|
||||
"binary_sensor": {"process": ["python3", "pip"]},
|
||||
"sensor": {"process": ["python3", "pip"]},
|
||||
"resources": [
|
||||
"disk_use_percent_/",
|
||||
"disk_use_percent_/home/notexist/",
|
||||
"memory_free_",
|
||||
"network_out_eth0",
|
||||
"process_python3",
|
||||
],
|
||||
},
|
||||
)
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
process_sensor = hass.states.get("sensor.system_monitor_process_python3")
|
||||
assert process_sensor is not None
|
||||
assert process_sensor.state == STATE_ON
|
||||
|
Loading…
x
Reference in New Issue
Block a user