Add Shelly tests coverage (#82642)

* Add Shelly tests coverage

* Review comments

* Remove leftovers
This commit is contained in:
Shay Levy 2022-11-24 20:07:19 +02:00 committed by GitHub
parent caa99ea9fb
commit 1e68e8c4b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 818 additions and 32 deletions

View File

@ -1105,8 +1105,6 @@ omit =
homeassistant/components/sesame/lock.py
homeassistant/components/seven_segments/image_processing.py
homeassistant/components/seventeentrack/sensor.py
homeassistant/components/shelly/coordinator.py
homeassistant/components/shelly/utils.py
homeassistant/components/shiftr/*
homeassistant/components/shodan/sensor.py
homeassistant/components/sia/__init__.py

View File

@ -169,6 +169,7 @@ class ShellyBlockCoordinator(DataUpdateCoordinator):
"inputEvent" not in block.sensor_ids
or "inputEventCnt" not in block.sensor_ids
):
LOGGER.debug("Skipping non-input event block %s", block.description)
continue
channel = int(block.channel or 0) + 1
@ -181,6 +182,7 @@ class ShellyBlockCoordinator(DataUpdateCoordinator):
or last_event_count == block.inputEventCnt
or event_type == ""
):
LOGGER.debug("Skipping block event %s", event_type)
continue
if event_type in INPUTS_EVENTS_DICT:
@ -194,12 +196,6 @@ class ShellyBlockCoordinator(DataUpdateCoordinator):
ATTR_GENERATION: 1,
},
)
else:
LOGGER.warning(
"Shelly input event %s for device %s is not supported, please open issue",
event_type,
self.name,
)
if self._last_cfg_changed is not None and cfg_changed > self._last_cfg_changed:
LOGGER.info(
@ -610,11 +606,6 @@ class ShellyRpcPollingCoordinator(DataUpdateCoordinator):
except InvalidAuthError:
self.entry.async_start_reauth(self.hass)
@property
def model(self) -> str:
"""Model of the device."""
return cast(str, self.entry.data["model"])
@property
def mac(self) -> str:
"""Mac address of the device."""
@ -625,9 +616,6 @@ def get_block_coordinator_by_device_id(
hass: HomeAssistant, device_id: str
) -> ShellyBlockCoordinator | None:
"""Get a Shelly block device coordinator for the given device id."""
if not hass.data.get(DOMAIN):
return None
dev_reg = device_registry.async_get(hass)
if device := dev_reg.async_get(device_id):
for config_entry in device.config_entries:
@ -644,9 +632,6 @@ def get_rpc_coordinator_by_device_id(
hass: HomeAssistant, device_id: str
) -> ShellyRpcCoordinator | None:
"""Get a Shelly RPC device coordinator for the given device id."""
if not hass.data.get(DOMAIN):
return None
dev_reg = device_registry.async_get(hass)
if device := dev_reg.async_get(device_id):
for config_entry in device.config_entries:

View File

@ -78,11 +78,9 @@ def inject_rpc_device_event(
mock_rpc_device.mock_event()
async def mock_rest_update(hass: HomeAssistant):
async def mock_rest_update(hass: HomeAssistant, seconds=REST_SENSORS_UPDATE_INTERVAL):
"""Move time to create REST sensors update event."""
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=REST_SENSORS_UPDATE_INTERVAL)
)
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=seconds))
await hass.async_block_till_done()

View File

@ -104,6 +104,19 @@ async def test_scanner_ignores_wrong_version_and_logs(
assert "Unsupported BLE scan result version: 0" in caplog.text
async def test_scanner_minimum_firmware_log_error(
hass, mock_rpc_device, monkeypatch, caplog
):
"""Test scanner log error if device firmware incompatible."""
monkeypatch.setattr(mock_rpc_device, "version", "0.11.0")
await init_integration(
hass, 2, options={CONF_BLE_SCANNER_MODE: BLEScannerMode.ACTIVE}
)
assert mock_rpc_device.initialized is True
assert "BLE not supported on device" in caplog.text
async def test_scanner_warns_on_corrupt_event(
hass, mock_rpc_device, monkeypatch, caplog
):

View File

@ -90,7 +90,7 @@ MOCK_BLOCKS = [
),
),
Mock(
sensor_ids={},
sensor_ids={"mode": "color", "effect": 0},
channel="0",
output=mock_light_set_state()["ison"],
colorTemp=mock_light_set_state()["temp"],
@ -115,6 +115,8 @@ MOCK_BLOCKS = [
cfgChanged=0,
mode=0,
valvePos=50,
inputEvent="S",
wakeupEvent=["button"],
description="device_0",
type="device",
),
@ -127,6 +129,7 @@ MOCK_CONFIG = {
"sys": {
"ui_data": {},
"device": {"name": "Test name"},
"wakeup_period": 0,
},
}
@ -255,6 +258,7 @@ def _mock_rpc_device(version: str | None = None):
event={},
shelly=MOCK_SHELLY_RPC,
version=version or "0.12.0",
hostname="test-host",
status=MOCK_STATUS_RPC,
firmware_version="some fw string",
initialized=True,

View File

@ -2,6 +2,7 @@
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.shelly.const import SLEEP_PERIOD_MULTIPLIER
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import State
from homeassistant.helpers.entity_registry import async_get
@ -66,6 +67,29 @@ async def test_block_rest_binary_sensor(hass, mock_block_device, monkeypatch):
assert hass.states.get(entity_id).state == STATE_ON
async def test_block_rest_binary_sensor_connected_battery_devices(
hass, mock_block_device, monkeypatch
):
"""Test block REST binary sensor for connected battery devices."""
entity_id = register_entity(hass, BINARY_SENSOR_DOMAIN, "test_name_cloud", "cloud")
monkeypatch.setitem(mock_block_device.status, "cloud", {"connected": False})
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHMOS-01")
monkeypatch.setitem(mock_block_device.settings["coiot"], "update_period", 3600)
await init_integration(hass, 1, model="SHMOS-01")
assert hass.states.get(entity_id).state == STATE_OFF
monkeypatch.setitem(mock_block_device.status["cloud"], "connected", True)
# Verify no update on fast intervals
await mock_rest_update(hass)
assert hass.states.get(entity_id).state == STATE_OFF
# Verify update on slow intervals
await mock_rest_update(hass, seconds=SLEEP_PERIOD_MULTIPLIER * 3600)
assert hass.states.get(entity_id).state == STATE_ON
async def test_block_sleeping_binary_sensor(hass, mock_block_device, monkeypatch):
"""Test block sleeping binary sensor."""
entity_id = f"{BINARY_SENSOR_DOMAIN}.test_name_motion"

View File

@ -522,18 +522,26 @@ async def test_form_auth_errors_test_connection_gen2(hass, error):
assert result3["errors"] == {"base": base_error}
async def test_zeroconf(hass):
@pytest.mark.parametrize(
"gen, get_info",
[
(1, {"mac": "test-mac", "type": "SHSW-1", "auth": False, "gen": 1}),
(2, {"mac": "test-mac", "model": "SHSW-1", "auth": False, "gen": 2}),
],
)
async def test_zeroconf(hass, gen, get_info):
"""Test we get the form."""
with patch(
"aioshelly.common.get_info",
return_value={"mac": "test-mac", "type": "SHSW-1", "auth": False},
), patch(
with patch("aioshelly.common.get_info", return_value=get_info), patch(
"aioshelly.block_device.BlockDevice.create",
new=AsyncMock(return_value=Mock(model="SHSW-1", settings=MOCK_SETTINGS)),
), patch(
"aioshelly.rpc_device.RpcDevice.create",
new=AsyncMock(
return_value=Mock(
model="SHSW-1",
settings=MOCK_SETTINGS,
shelly={"model": "SHSW-1", "gen": gen},
config=MOCK_CONFIG,
shutdown=AsyncMock(),
)
),
):
@ -569,7 +577,7 @@ async def test_zeroconf(hass):
"host": "1.1.1.1",
"model": "SHSW-1",
"sleep_period": 0,
"gen": 1,
"gen": gen,
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1

View File

@ -0,0 +1,531 @@
"""Tests for Shelly coordinator."""
from datetime import timedelta
from unittest.mock import AsyncMock
from aioshelly.exceptions import DeviceConnectionError, InvalidAuthError
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.shelly.const import (
ATTR_CHANNEL,
ATTR_CLICK_TYPE,
ATTR_DEVICE,
ATTR_GENERATION,
DOMAIN,
ENTRY_RELOAD_COOLDOWN,
RPC_RECONNECT_INTERVAL,
SLEEP_PERIOD_MULTIPLIER,
UPDATE_PERIOD_MULTIPLIER,
)
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntryState
from homeassistant.const import ATTR_DEVICE_ID, STATE_ON, STATE_UNAVAILABLE
from homeassistant.helpers.device_registry import (
async_entries_for_config_entry,
async_get as async_get_dev_reg,
)
from homeassistant.util import dt
from . import (
init_integration,
inject_rpc_device_event,
mock_polling_rpc_update,
mock_rest_update,
register_entity,
)
from tests.common import async_fire_time_changed
RELAY_BLOCK_ID = 0
LIGHT_BLOCK_ID = 2
SENSOR_BLOCK_ID = 3
DEVICE_BLOCK_ID = 4
async def test_block_reload_on_cfg_change(hass, mock_block_device, monkeypatch):
"""Test block reload on config change."""
await init_integration(hass, 1)
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "cfgChanged", 1)
mock_block_device.mock_update()
await hass.async_block_till_done()
# Generate config change from switch to light
monkeypatch.setitem(
mock_block_device.settings["relays"][RELAY_BLOCK_ID], "appliance_type", "light"
)
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "cfgChanged", 2)
mock_block_device.mock_update()
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is not None
# Wait for debouncer
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=ENTRY_RELOAD_COOLDOWN)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is None
async def test_block_no_reload_on_bulb_changes(hass, mock_block_device, monkeypatch):
"""Test block no reload on bulb mode/effect change."""
await init_integration(hass, 1, model="SHBLB-1")
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "cfgChanged", 1)
mock_block_device.mock_update()
await hass.async_block_till_done()
# Test no reload on mode change
monkeypatch.setitem(
mock_block_device.settings["relays"][RELAY_BLOCK_ID], "appliance_type", "light"
)
monkeypatch.setattr(mock_block_device.blocks[LIGHT_BLOCK_ID], "mode", "white")
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "cfgChanged", 2)
mock_block_device.mock_update()
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is not None
# Wait for debouncer
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=ENTRY_RELOAD_COOLDOWN)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is not None
# Test no reload on effect change
monkeypatch.setattr(mock_block_device.blocks[LIGHT_BLOCK_ID], "effect", 1)
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "cfgChanged", 3)
mock_block_device.mock_update()
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is not None
# Wait for debouncer
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=ENTRY_RELOAD_COOLDOWN)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1") is not None
async def test_block_polling_auth_error(hass, mock_block_device, monkeypatch):
"""Test block device polling authentication error."""
monkeypatch.setattr(
mock_block_device,
"update",
AsyncMock(side_effect=InvalidAuthError),
)
entry = await init_integration(hass, 1)
assert entry.state == ConfigEntryState.LOADED
# Move time to generate polling
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=UPDATE_PERIOD_MULTIPLIER * 15)
)
await hass.async_block_till_done()
assert entry.state == ConfigEntryState.LOADED
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow.get("step_id") == "reauth_confirm"
assert flow.get("handler") == DOMAIN
assert "context" in flow
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id
async def test_block_rest_update_auth_error(hass, mock_block_device, monkeypatch):
"""Test block REST update authentication error."""
register_entity(hass, BINARY_SENSOR_DOMAIN, "test_name_cloud", "cloud")
monkeypatch.setitem(mock_block_device.status, "cloud", {"connected": False})
monkeypatch.setitem(mock_block_device.status, "uptime", 1)
entry = await init_integration(hass, 1)
monkeypatch.setattr(
mock_block_device,
"update_shelly",
AsyncMock(side_effect=InvalidAuthError),
)
assert entry.state == ConfigEntryState.LOADED
await mock_rest_update(hass)
assert entry.state == ConfigEntryState.LOADED
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow.get("step_id") == "reauth_confirm"
assert flow.get("handler") == DOMAIN
assert "context" in flow
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id
async def test_block_polling_connection_error(hass, mock_block_device, monkeypatch):
"""Test block device polling connection error."""
monkeypatch.setattr(
mock_block_device,
"update",
AsyncMock(side_effect=DeviceConnectionError),
)
await init_integration(hass, 1)
assert hass.states.get("switch.test_name_channel_1").state == STATE_ON
# Move time to generate polling
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=UPDATE_PERIOD_MULTIPLIER * 15)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_name_channel_1").state == STATE_UNAVAILABLE
async def test_block_rest_update_connection_error(hass, mock_block_device, monkeypatch):
"""Test block REST update connection error."""
entity_id = register_entity(hass, BINARY_SENSOR_DOMAIN, "test_name_cloud", "cloud")
monkeypatch.setitem(mock_block_device.status, "cloud", {"connected": True})
monkeypatch.setitem(mock_block_device.status, "uptime", 1)
await init_integration(hass, 1)
await mock_rest_update(hass)
assert hass.states.get(entity_id).state == STATE_ON
monkeypatch.setattr(
mock_block_device,
"update_shelly",
AsyncMock(side_effect=DeviceConnectionError),
)
await mock_rest_update(hass)
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_block_sleeping_device_no_periodic_updates(hass, mock_block_device):
"""Test block sleeping device no periodic updates."""
entity_id = f"{SENSOR_DOMAIN}.test_name_temperature"
await init_integration(hass, 1, sleep_period=1000)
# Make device online
mock_block_device.mock_update()
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == "22.1"
# Move time to generate polling
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=SLEEP_PERIOD_MULTIPLIER * 1000)
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_block_button_click_event(hass, mock_block_device, events, monkeypatch):
"""Test block click event for Shelly button."""
monkeypatch.setattr(mock_block_device.blocks[RELAY_BLOCK_ID], "sensor_ids", {})
monkeypatch.setattr(
mock_block_device.blocks[DEVICE_BLOCK_ID],
"sensor_ids",
{"inputEvent": "S", "inputEventCnt": 0},
)
entry = await init_integration(hass, 1, model="SHBTN-1", sleep_period=1000)
# Make device online
mock_block_device.mock_update()
await hass.async_block_till_done()
dev_reg = async_get_dev_reg(hass)
device = async_entries_for_config_entry(dev_reg, entry.entry_id)[0]
# Generate button click event
mock_block_device.mock_update()
await hass.async_block_till_done()
assert len(events) == 1
assert events[0].data == {
ATTR_DEVICE_ID: device.id,
ATTR_DEVICE: "test-host",
ATTR_CHANNEL: 1,
ATTR_CLICK_TYPE: "single",
ATTR_GENERATION: 1,
}
# Test ignore empty event
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "inputEvent", "")
mock_block_device.mock_update()
await hass.async_block_till_done()
mock_block_device.mock_update()
await hass.async_block_till_done()
assert len(events) == 1
async def test_rpc_reload_on_cfg_change(hass, mock_rpc_device, monkeypatch):
"""Test RPC reload on config change."""
await init_integration(hass, 2)
# Generate config change from switch to light
monkeypatch.setitem(
mock_rpc_device.config["sys"]["ui_data"], "consumption_types", ["lights"]
)
inject_rpc_device_event(
monkeypatch,
mock_rpc_device,
{
"events": [
{
"data": [],
"event": "config_changed",
"id": 1,
"ts": 1668522399.2,
},
{
"data": [],
"id": 2,
"ts": 1668522399.2,
},
],
"ts": 1668522399.2,
},
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_switch_0") is not None
# Wait for debouncer
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=ENTRY_RELOAD_COOLDOWN)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_switch_0") is None
async def test_rpc_click_event(hass, mock_rpc_device, events, monkeypatch):
"""Test RPC click event."""
entry = await init_integration(hass, 2)
dev_reg = async_get_dev_reg(hass)
device = async_entries_for_config_entry(dev_reg, entry.entry_id)[0]
# Generate config change from switch to light
inject_rpc_device_event(
monkeypatch,
mock_rpc_device,
{
"events": [
{
"data": [],
"event": "single_push",
"id": 0,
"ts": 1668522399.2,
}
],
"ts": 1668522399.2,
},
)
await hass.async_block_till_done()
assert len(events) == 1
assert events[0].data == {
ATTR_DEVICE_ID: device.id,
ATTR_DEVICE: "test-host",
ATTR_CHANNEL: 1,
ATTR_CLICK_TYPE: "single_push",
ATTR_GENERATION: 2,
}
async def test_rpc_update_entry_sleep_period(hass, mock_rpc_device, monkeypatch):
"""Test RPC update entry sleep period."""
entry = await init_integration(hass, 2, sleep_period=600)
register_entity(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
entry,
)
# Make device online
mock_rpc_device.mock_update()
await hass.async_block_till_done()
assert entry.data["sleep_period"] == 600
# Move time to generate sleep period update
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 3600)
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=600 * SLEEP_PERIOD_MULTIPLIER)
)
await hass.async_block_till_done()
assert entry.data["sleep_period"] == 3600
async def test_rpc_sleeping_device_no_periodic_updates(
hass, mock_rpc_device, monkeypatch
):
"""Test RPC sleeping device no periodic updates."""
entity_id = f"{SENSOR_DOMAIN}.test_name_temperature"
entry = await init_integration(hass, 2, sleep_period=1000)
register_entity(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
entry,
)
# Make device online
mock_rpc_device.mock_update()
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == "22.9"
# Move time to generate polling
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=SLEEP_PERIOD_MULTIPLIER * 1000)
)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_rpc_reconnect_auth_error(hass, mock_rpc_device, monkeypatch):
"""Test RPC reconnect authentication error."""
entry = await init_integration(hass, 2)
monkeypatch.setattr(mock_rpc_device, "connected", False)
monkeypatch.setattr(
mock_rpc_device,
"initialize",
AsyncMock(
side_effect=InvalidAuthError,
),
)
assert entry.state == ConfigEntryState.LOADED
# Move time to generate reconnect
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=RPC_RECONNECT_INTERVAL)
)
await hass.async_block_till_done()
assert entry.state == ConfigEntryState.LOADED
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow.get("step_id") == "reauth_confirm"
assert flow.get("handler") == DOMAIN
assert "context" in flow
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id
async def test_rpc_polling_auth_error(hass, mock_rpc_device, monkeypatch) -> None:
"""Test RPC polling authentication error."""
register_entity(hass, SENSOR_DOMAIN, "test_name_rssi", "wifi-rssi")
entry = await init_integration(hass, 2)
monkeypatch.setattr(
mock_rpc_device,
"update_status",
AsyncMock(
side_effect=InvalidAuthError,
),
)
assert entry.state == ConfigEntryState.LOADED
await mock_polling_rpc_update(hass)
assert entry.state == ConfigEntryState.LOADED
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow.get("step_id") == "reauth_confirm"
assert flow.get("handler") == DOMAIN
assert "context" in flow
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id
async def test_rpc_reconnect_error(hass, mock_rpc_device, monkeypatch):
"""Test RPC reconnect error."""
await init_integration(hass, 2)
assert hass.states.get("switch.test_switch_0").state == STATE_ON
monkeypatch.setattr(mock_rpc_device, "connected", False)
monkeypatch.setattr(
mock_rpc_device,
"initialize",
AsyncMock(
side_effect=DeviceConnectionError,
),
)
# Move time to generate reconnect
async_fire_time_changed(
hass, dt.utcnow() + timedelta(seconds=RPC_RECONNECT_INTERVAL)
)
await hass.async_block_till_done()
assert hass.states.get("switch.test_switch_0").state == STATE_UNAVAILABLE
async def test_rpc_polling_connection_error(hass, mock_rpc_device, monkeypatch) -> None:
"""Test RPC polling connection error."""
entity_id = register_entity(hass, SENSOR_DOMAIN, "test_name_rssi", "wifi-rssi")
await init_integration(hass, 2)
monkeypatch.setattr(
mock_rpc_device,
"update_status",
AsyncMock(
side_effect=DeviceConnectionError,
),
)
assert hass.states.get(entity_id).state == "-63"
await mock_polling_rpc_update(hass)
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
async def test_rpc_polling_disconnected(hass, mock_rpc_device, monkeypatch) -> None:
"""Test RPC polling device disconnected."""
entity_id = register_entity(hass, SENSOR_DOMAIN, "test_name_rssi", "wifi-rssi")
await init_integration(hass, 2)
monkeypatch.setattr(mock_rpc_device, "connected", False)
assert hass.states.get(entity_id).state == "-63"
await mock_polling_rpc_update(hass)
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE

View File

@ -0,0 +1,225 @@
"""Tests for Shelly utils."""
from freezegun import freeze_time
import pytest
from homeassistant.components.shelly.utils import (
get_block_channel_name,
get_block_device_sleep_period,
get_block_input_triggers,
get_device_uptime,
get_number_of_channels,
get_rpc_channel_name,
get_rpc_input_triggers,
is_block_momentary_input,
)
from homeassistant.util import dt
DEVICE_BLOCK_ID = 4
async def test_block_get_number_of_channels(mock_block_device, monkeypatch):
"""Test block get number of channels."""
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "type", "emeter")
monkeypatch.setitem(mock_block_device.shelly, "num_emeters", 3)
assert (
get_number_of_channels(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== 3
)
monkeypatch.setitem(mock_block_device.shelly, "num_inputs", 4)
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "type", "input")
assert (
get_number_of_channels(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== 4
)
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHDM-2")
assert (
get_number_of_channels(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== 2
)
async def test_block_get_block_channel_name(mock_block_device, monkeypatch):
"""Test block get block channel name."""
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "type", "relay")
assert (
get_block_channel_name(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== "Test name channel 1"
)
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHEM-3")
assert (
get_block_channel_name(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== "Test name channel A"
)
monkeypatch.setitem(
mock_block_device.settings, "relays", [{"name": "test-channel"}]
)
assert (
get_block_channel_name(
mock_block_device,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
== "test-channel"
)
async def test_is_block_momentary_input(mock_block_device, monkeypatch):
"""Test is block momentary input."""
monkeypatch.setattr(mock_block_device.blocks[DEVICE_BLOCK_ID], "type", "relay")
monkeypatch.setitem(mock_block_device.settings, "mode", "roller")
monkeypatch.setitem(
mock_block_device.settings, "rollers", [{"button_type": "detached"}]
)
assert (
is_block_momentary_input(
mock_block_device.settings,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
is False
)
assert (
is_block_momentary_input(
mock_block_device.settings, mock_block_device.blocks[DEVICE_BLOCK_ID], True
)
is True
)
monkeypatch.setitem(mock_block_device.settings, "mode", "relay")
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHSW-L")
assert (
is_block_momentary_input(
mock_block_device.settings, mock_block_device.blocks[DEVICE_BLOCK_ID], True
)
is False
)
monkeypatch.delitem(mock_block_device.settings, "relays")
monkeypatch.delitem(mock_block_device.settings, "rollers")
assert (
is_block_momentary_input(
mock_block_device.settings,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
is False
)
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHBTN-2")
assert (
is_block_momentary_input(
mock_block_device.settings,
mock_block_device.blocks[DEVICE_BLOCK_ID],
)
is True
)
@pytest.mark.parametrize(
"settings, sleep_period",
[
({}, 0),
({"sleep_mode": {"period": 1000, "unit": "m"}}, 1000 * 60),
({"sleep_mode": {"period": 5, "unit": "h"}}, 5 * 3600),
],
)
async def test_get_block_device_sleep_period(settings, sleep_period):
"""Test get block device sleep period."""
assert get_block_device_sleep_period(settings) == sleep_period
@freeze_time("2019-01-10 18:43:00+00:00")
async def test_get_device_uptime():
"""Test block test get device uptime."""
assert get_device_uptime(
55, dt.as_utc(dt.parse_datetime("2019-01-10 18:42:00+00:00"))
) == dt.as_utc(dt.parse_datetime("2019-01-10 18:42:00+00:00"))
assert get_device_uptime(
50, dt.as_utc(dt.parse_datetime("2019-01-10 18:42:00+00:00"))
) == dt.as_utc(dt.parse_datetime("2019-01-10 18:42:10+00:00"))
async def test_get_block_input_triggers(mock_block_device, monkeypatch):
"""Test get block input triggers."""
monkeypatch.setattr(
mock_block_device.blocks[DEVICE_BLOCK_ID],
"sensor_ids",
{"inputEvent": "S", "inputEventCnt": 0},
)
monkeypatch.setitem(
mock_block_device.settings, "rollers", [{"button_type": "detached"}]
)
assert set(
get_block_input_triggers(
mock_block_device, mock_block_device.blocks[DEVICE_BLOCK_ID]
)
) == {("long", "button"), ("single", "button")}
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHBTN-1")
assert set(
get_block_input_triggers(
mock_block_device, mock_block_device.blocks[DEVICE_BLOCK_ID]
)
) == {
("long", "button"),
("double", "button"),
("single", "button"),
("triple", "button"),
}
monkeypatch.setitem(mock_block_device.settings["device"], "type", "SHIX3-1")
assert set(
get_block_input_triggers(
mock_block_device, mock_block_device.blocks[DEVICE_BLOCK_ID]
)
) == {
("long_single", "button"),
("single_long", "button"),
("triple", "button"),
("long", "button"),
("single", "button"),
("double", "button"),
}
async def test_get_rpc_channel_name(mock_rpc_device):
"""Test get RPC channel name."""
assert get_rpc_channel_name(mock_rpc_device, "input:0") == "test switch_0"
async def test_get_rpc_input_triggers(mock_rpc_device, monkeypatch):
"""Test get RPC input triggers."""
monkeypatch.setattr(mock_rpc_device, "config", {"input:0": {"type": "button"}})
assert set(get_rpc_input_triggers(mock_rpc_device)) == {
("long_push", "button1"),
("single_push", "button1"),
("btn_down", "button1"),
("double_push", "button1"),
("btn_up", "button1"),
}
monkeypatch.setattr(mock_rpc_device, "config", {"input:0": {"type": "switch"}})
assert not get_rpc_input_triggers(mock_rpc_device)