Add switch groups (#68528)

This commit is contained in:
Franck Nijhof 2022-03-28 12:27:26 +02:00 committed by GitHub
parent a597c11ea2
commit 01980f0445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 604 additions and 5 deletions

View File

@ -67,6 +67,7 @@ PLATFORMS = [
Platform.LIGHT,
Platform.MEDIA_PLAYER,
Platform.NOTIFY,
Platform.SWITCH,
]
REG_KEY = f"{DOMAIN}_registry"

View File

@ -55,11 +55,19 @@ LIGHT_OPTIONS_SCHEMA = basic_group_options_schema("light").extend(
}
)
SWITCH_OPTIONS_SCHEMA = basic_group_options_schema("switch").extend(
{
vol.Required(
CONF_ALL, default=False, description={"advanced": True}
): selector.selector({"boolean": {}}),
}
)
BINARY_SENSOR_CONFIG_SCHEMA = vol.Schema(
{vol.Required("name"): selector.selector({"text": {}})}
).extend(BINARY_SENSOR_OPTIONS_SCHEMA.schema)
GROUP_TYPES = ["binary_sensor", "cover", "fan", "light", "media_player"]
GROUP_TYPES = ["binary_sensor", "cover", "fan", "light", "media_player", "switch"]
@callback
@ -94,6 +102,9 @@ CONFIG_FLOW: dict[str, HelperFlowFormStep | HelperFlowMenuStep] = {
"media_player": HelperFlowFormStep(
basic_group_config_schema("media_player"), set_group_type("media_player")
),
"switch": HelperFlowFormStep(
basic_group_config_schema("switch"), set_group_type("switch")
),
}
@ -104,11 +115,12 @@ OPTIONS_FLOW: dict[str, HelperFlowFormStep | HelperFlowMenuStep] = {
"fan": HelperFlowFormStep(basic_group_options_schema("fan")),
"light": HelperFlowFormStep(LIGHT_OPTIONS_SCHEMA),
"media_player": HelperFlowFormStep(basic_group_options_schema("media_player")),
"switch": HelperFlowFormStep(SWITCH_OPTIONS_SCHEMA),
}
class GroupConfigFlowHandler(HelperConfigFlowHandler, domain=DOMAIN):
"""Handle a config or options flow for Switch Light."""
"""Handle a config or options flow for groups."""
config_flow = CONFIG_FLOW
options_flow = OPTIONS_FLOW

View File

@ -10,7 +10,8 @@
"cover": "Cover group",
"fan": "Fan group",
"light": "Light group",
"media_player": "Media player group"
"media_player": "Media player group",
"switch": "Switch group"
}
},
"binary_sensor": {
@ -54,6 +55,14 @@
"hide_members": "[%key:component::group::config::step::binary_sensor::data::hide_members%]",
"name": "[%key:component::group::config::step::binary_sensor::data::name%]"
}
},
"switch": {
"title": "[%key:component::group::config::step::user::title%]",
"data": {
"entities": "[%key:component::group::config::step::binary_sensor::data::entities%]",
"hide_members": "[%key:component::group::config::step::binary_sensor::data::hide_members%]",
"name": "[%key:component::group::config::step::binary_sensor::data::name%]"
}
}
}
},
@ -92,6 +101,14 @@
"entities": "[%key:component::group::config::step::binary_sensor::data::entities%]",
"hide_members": "[%key:component::group::config::step::binary_sensor::data::hide_members%]"
}
},
"switch": {
"description": "[%key:component::group::config::step::binary_sensor::description%]",
"data": {
"all": "[%key:component::group::config::step::binary_sensor::data::all%]",
"entities": "[%key:component::group::config::step::binary_sensor::data::entities%]",
"hide_members": "[%key:component::group::config::step::binary_sensor::data::hide_members%]"
}
}
}
},

View File

@ -0,0 +1,173 @@
"""This platform allows several switches to be grouped into one switch."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components.switch import DOMAIN, PLATFORM_SCHEMA, SwitchEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_ENTITIES,
CONF_NAME,
CONF_UNIQUE_ID,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import GroupEntity
DEFAULT_NAME = "Switch Group"
CONF_ALL = "all"
# No limit on parallel updates to enable a group calling another group
PARALLEL_UPDATES = 0
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_ENTITIES): cv.entities_domain(DOMAIN),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_ALL, default=False): cv.boolean,
}
)
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Switch Group platform."""
async_add_entities(
[
SwitchGroup(
config.get(CONF_UNIQUE_ID),
config[CONF_NAME],
config[CONF_ENTITIES],
config.get(CONF_ALL, False),
)
]
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Initialize Switch Group config entry."""
registry = er.async_get(hass)
entities = er.async_validate_entity_ids(
registry, config_entry.options[CONF_ENTITIES]
)
async_add_entities(
[
SwitchGroup(
config_entry.entry_id,
config_entry.title,
entities,
config_entry.options.get(CONF_ALL),
)
]
)
class SwitchGroup(GroupEntity, SwitchEntity):
"""Representation of a switch group."""
_attr_available = False
_attr_should_poll = False
def __init__(
self,
unique_id: str | None,
name: str,
entity_ids: list[str],
mode: bool | None,
) -> None:
"""Initialize a switch group."""
self._entity_ids = entity_ids
self._attr_name = name
self._attr_extra_state_attributes = {ATTR_ENTITY_ID: entity_ids}
self._attr_unique_id = unique_id
self.mode = any
if mode:
self.mode = all
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
@callback
def async_state_changed_listener(event: Event) -> None:
"""Handle child updates."""
self.async_set_context(event.context)
self.async_defer_or_update_ha_state()
self.async_on_remove(
async_track_state_change_event(
self.hass, self._entity_ids, async_state_changed_listener
)
)
await super().async_added_to_hass()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Forward the turn_on command to all switches in the group."""
data = {ATTR_ENTITY_ID: self._entity_ids}
_LOGGER.debug("Forwarded turn_on command: %s", data)
await self.hass.services.async_call(
DOMAIN,
SERVICE_TURN_ON,
data,
blocking=True,
context=self._context,
)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Forward the turn_off command to all switches in the group."""
data = {ATTR_ENTITY_ID: self._entity_ids}
await self.hass.services.async_call(
DOMAIN,
SERVICE_TURN_OFF,
data,
blocking=True,
context=self._context,
)
@callback
def async_update_group_state(self) -> None:
"""Query all members and determine the switch group state."""
states = [
state.state
for entity_id in self._entity_ids
if (state := self.hass.states.get(entity_id)) is not None
]
valid_state = self.mode(
state not in (STATE_UNKNOWN, STATE_UNAVAILABLE) for state in states
)
if not valid_state:
# Set as unknown if any / all member is unknown or unavailable
self._attr_is_on = None
else:
# Set as ON if any / all member is ON
self._attr_is_on = self.mode(list(map(lambda x: x == STATE_ON, states)))
self._attr_available = any(state != STATE_UNAVAILABLE for state in states)

View File

@ -43,6 +43,14 @@
},
"title": "New Group"
},
"switch": {
"data": {
"entities": "Members",
"hide_members": "Hide members",
"name": "Name"
},
"title": "New Group"
},
"user": {
"description": "Select group type",
"menu_options": {
@ -50,7 +58,8 @@
"cover": "Cover group",
"fan": "Fan group",
"light": "Light group",
"media_player": "Media player group"
"media_player": "Media player group",
"switch": "Switch group"
},
"title": "New Group"
}
@ -91,6 +100,14 @@
"entities": "Members",
"hide_members": "Hide members"
}
},
"switch": {
"data": {
"all": "All entities",
"entities": "Members",
"hide_members": "Hide members"
},
"description": "If \"all entities\" is enabled, the group's state is on only if all members are on. If \"all entities\" is disabled, the group's state is on if any member is on."
}
}
},

View File

@ -10,6 +10,18 @@ light:
- light.outside_patio_lights
- light.outside_patio_lights_2
switch:
- platform: group
name: Master Switches G
entities:
- switch.master_switch
- switch.master_switch_2
- platform: group
name: Outside Switches G
entities:
- switch.outside_switch
- switch.outside_switch_2
notify:
- platform: group
name: new_group_notify

View File

@ -25,6 +25,7 @@ from tests.common import MockConfigEntry
("fan", "on", "on", {}, {}, {}, {}),
("light", "on", "on", {}, {}, {}, {}),
("media_player", "on", "on", {}, {}, {}, {}),
("switch", "on", "on", {}, {}, {}, {}),
),
)
async def test_config_flow(
@ -108,6 +109,7 @@ async def test_config_flow(
("fan", {}),
("light", {}),
("media_player", {}),
("switch", {}),
),
)
async def test_config_flow_hides_members(
@ -178,6 +180,7 @@ def get_suggested(schema, key):
("fan", "on", {}),
("light", "on", {"all": False}),
("media_player", "on", {}),
("switch", "on", {"all": False}),
),
)
async def test_options(
@ -273,9 +276,13 @@ async def test_options(
("light", {"all": True}, {"all": True}, False),
("light", {"all": False}, {"all": False}, True),
("light", {"all": True}, {"all": False}, True),
("switch", {"all": False}, {"all": False}, False),
("switch", {"all": True}, {"all": True}, False),
("switch", {"all": False}, {"all": False}, True),
("switch", {"all": True}, {"all": False}, True),
),
)
async def test_light_all_options(
async def test_all_options(
hass: HomeAssistant, group_type, extra_options, extra_options_after, advanced
) -> None:
"""Test reconfiguring."""
@ -345,6 +352,7 @@ async def test_light_all_options(
("fan", {}),
("light", {}),
("media_player", {}),
("switch", {}),
),
)
async def test_options_flow_hides_members(

View File

@ -0,0 +1,359 @@
"""The tests for the Group Switch platform."""
from unittest.mock import patch
import async_timeout
from homeassistant import config as hass_config
from homeassistant.components.group import DOMAIN, SERVICE_RELOAD
from homeassistant.components.switch import (
DOMAIN as SWITCH_DOMAIN,
SERVICE_TOGGLE,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
)
from homeassistant.const import (
ATTR_ENTITY_ID,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component
from tests.common import get_fixture_path
async def test_default_state(hass):
"""Test switch group default state."""
hass.states.async_set("switch.tv", "on")
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: {
"platform": DOMAIN,
"entities": ["switch.tv", "switch.soundbar"],
"name": "Multimedia Group",
"unique_id": "unique_identifier",
"all": "false",
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
state = hass.states.get("switch.multimedia_group")
assert state is not None
assert state.state == STATE_ON
assert state.attributes.get(ATTR_ENTITY_ID) == ["switch.tv", "switch.soundbar"]
entity_registry = er.async_get(hass)
entry = entity_registry.async_get("switch.multimedia_group")
assert entry
assert entry.unique_id == "unique_identifier"
async def test_state_reporting(hass):
"""Test the state reporting."""
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: {
"platform": DOMAIN,
"entities": ["switch.test1", "switch.test2"],
"all": "false",
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
hass.states.async_set("switch.test1", STATE_ON)
hass.states.async_set("switch.test2", STATE_UNAVAILABLE)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_ON
hass.states.async_set("switch.test1", STATE_ON)
hass.states.async_set("switch.test2", STATE_OFF)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_ON
hass.states.async_set("switch.test1", STATE_OFF)
hass.states.async_set("switch.test2", STATE_OFF)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_OFF
hass.states.async_set("switch.test1", STATE_UNAVAILABLE)
hass.states.async_set("switch.test2", STATE_UNAVAILABLE)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_UNAVAILABLE
async def test_state_reporting_all(hass):
"""Test the state reporting."""
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: {
"platform": DOMAIN,
"entities": ["switch.test1", "switch.test2"],
"all": "true",
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
hass.states.async_set("switch.test1", STATE_ON)
hass.states.async_set("switch.test2", STATE_UNAVAILABLE)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_UNKNOWN
hass.states.async_set("switch.test1", STATE_ON)
hass.states.async_set("switch.test2", STATE_OFF)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_OFF
hass.states.async_set("switch.test1", STATE_OFF)
hass.states.async_set("switch.test2", STATE_OFF)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_OFF
hass.states.async_set("switch.test1", STATE_ON)
hass.states.async_set("switch.test2", STATE_ON)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_ON
hass.states.async_set("switch.test1", STATE_UNAVAILABLE)
hass.states.async_set("switch.test2", STATE_UNAVAILABLE)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_UNAVAILABLE
async def test_service_calls(hass, enable_custom_integrations):
"""Test service calls."""
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: [
{"platform": "demo"},
{
"platform": DOMAIN,
"entities": [
"switch.ac",
"switch.decorative_lights",
],
"all": "false",
},
]
},
)
await hass.async_block_till_done()
group_state = hass.states.get("switch.switch_group")
assert group_state.state == STATE_ON
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TOGGLE,
{ATTR_ENTITY_ID: "switch.switch_group"},
blocking=True,
)
assert hass.states.get("switch.ac").state == STATE_OFF
assert hass.states.get("switch.decorative_lights").state == STATE_OFF
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.switch_group"},
blocking=True,
)
assert hass.states.get("switch.ac").state == STATE_ON
assert hass.states.get("switch.decorative_lights").state == STATE_ON
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.switch_group"},
blocking=True,
)
assert hass.states.get("switch.ac").state == STATE_OFF
assert hass.states.get("switch.decorative_lights").state == STATE_OFF
async def test_reload(hass):
"""Test the ability to reload switches."""
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: [
{"platform": "demo"},
{
"platform": DOMAIN,
"entities": [
"switch.ac",
"switch.decorative_lights",
],
"all": "false",
},
]
},
)
await hass.async_block_till_done()
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group").state == STATE_ON
yaml_path = get_fixture_path("configuration.yaml", "group")
with patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path):
await hass.services.async_call(
DOMAIN,
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group") is None
assert hass.states.get("switch.master_switches_g") is not None
assert hass.states.get("switch.outside_switches_g") is not None
async def test_reload_with_platform_not_setup(hass):
"""Test the ability to reload switches."""
hass.states.async_set("switch.something", STATE_ON)
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: [
{"platform": "demo"},
]
},
)
assert await async_setup_component(
hass,
"group",
{
"group": {
"group_zero": {"entities": "switch.something", "icon": "mdi:work"},
}
},
)
await hass.async_block_till_done()
yaml_path = get_fixture_path("configuration.yaml", "group")
with patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path):
await hass.services.async_call(
DOMAIN,
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group") is None
assert hass.states.get("switch.master_switches_g") is not None
assert hass.states.get("switch.outside_switches_g") is not None
async def test_reload_with_base_integration_platform_not_setup(hass):
"""Test the ability to reload switches."""
assert await async_setup_component(
hass,
"group",
{
"group": {
"group_zero": {"entities": "switch.something", "icon": "mdi:work"},
}
},
)
await hass.async_block_till_done()
hass.states.async_set("switch.master_switch", STATE_ON)
hass.states.async_set("switch.master_switch_2", STATE_OFF)
hass.states.async_set("switch.outside_switch", STATE_OFF)
hass.states.async_set("switch.outside_switch_2", STATE_OFF)
yaml_path = get_fixture_path("configuration.yaml", "group")
with patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path):
await hass.services.async_call(
DOMAIN,
SERVICE_RELOAD,
{},
blocking=True,
)
await hass.async_block_till_done()
assert hass.states.get("switch.switch_group") is None
assert hass.states.get("switch.master_switches_g") is not None
assert hass.states.get("switch.outside_switches_g") is not None
assert hass.states.get("switch.master_switches_g").state == STATE_ON
assert hass.states.get("switch.outside_switches_g").state == STATE_OFF
async def test_nested_group(hass):
"""Test nested switch group."""
await async_setup_component(
hass,
SWITCH_DOMAIN,
{
SWITCH_DOMAIN: [
{"platform": "demo"},
{
"platform": DOMAIN,
"entities": ["switch.some_group"],
"name": "Nested Group",
"all": "false",
},
{
"platform": DOMAIN,
"entities": ["switch.ac", "switch.decorative_lights"],
"name": "Some Group",
"all": "false",
},
]
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
state = hass.states.get("switch.some_group")
assert state is not None
assert state.state == STATE_ON
assert state.attributes.get(ATTR_ENTITY_ID) == [
"switch.ac",
"switch.decorative_lights",
]
state = hass.states.get("switch.nested_group")
assert state is not None
assert state.state == STATE_ON
assert state.attributes.get(ATTR_ENTITY_ID) == ["switch.some_group"]
# Test controlling the nested group
async with async_timeout.timeout(0.5):
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TOGGLE,
{ATTR_ENTITY_ID: "switch.nested_group"},
blocking=True,
)
assert hass.states.get("switch.ac").state == STATE_OFF
assert hass.states.get("switch.decorative_lights").state == STATE_OFF
assert hass.states.get("switch.some_group").state == STATE_OFF
assert hass.states.get("switch.nested_group").state == STATE_OFF