From a0f35a84ae1281147c454c5071b474562f0e8841 Mon Sep 17 00:00:00 2001 From: Andre Lengwenus Date: Wed, 14 May 2025 16:49:30 +0200 Subject: [PATCH] Positioning for LCN covers (#143588) * Fix motor control function names * Add position logic for BS4 * Use helper methods from pypck * Add motor positioning to domain_data schema * Fix tests * Add motor positioning via module * Invert motor cover positions * Merge relay cover classes back into one class * Update snapshot for covers * Revert bump lcn-frontend to 0.2.4 --- homeassistant/components/lcn/const.py | 5 +- homeassistant/components/lcn/cover.py | 107 ++++++--- homeassistant/components/lcn/manifest.json | 2 +- homeassistant/components/lcn/schemas.py | 9 +- requirements_all.txt | 2 +- requirements_test_all.txt | 2 +- .../lcn/fixtures/config_entry_pchk.json | 25 ++- .../components/lcn/snapshots/test_cover.ambr | 98 ++++++++ tests/components/lcn/test_cover.py | 209 +++++++++++++----- 9 files changed, 370 insertions(+), 89 deletions(-) diff --git a/homeassistant/components/lcn/const.py b/homeassistant/components/lcn/const.py index b443e05def7..d67c02ed56a 100644 --- a/homeassistant/components/lcn/const.py +++ b/homeassistant/components/lcn/const.py @@ -56,6 +56,7 @@ CONF_SCENES = "scenes" CONF_REGISTER = "register" CONF_OUTPUTS = "outputs" CONF_REVERSE_TIME = "reverse_time" +CONF_POSITIONING_MODE = "positioning_mode" DIM_MODES = ["STEPS50", "STEPS200"] @@ -235,4 +236,6 @@ TIME_UNITS = [ "D", ] -MOTOR_REVERSE_TIME = ["RT70", "RT600", "RT1200"] +MOTOR_REVERSE_TIMES = ["RT70", "RT600", "RT1200"] + +MOTOR_POSITIONING_MODES = ["NONE", "BS4", "MODULE"] diff --git a/homeassistant/components/lcn/cover.py b/homeassistant/components/lcn/cover.py index be713871aae..068d8f5ba11 100644 --- a/homeassistant/components/lcn/cover.py +++ b/homeassistant/components/lcn/cover.py @@ -6,7 +6,12 @@ from typing import Any import pypck -from homeassistant.components.cover import DOMAIN as DOMAIN_COVER, CoverEntity +from homeassistant.components.cover import ( + ATTR_POSITION, + DOMAIN as DOMAIN_COVER, + CoverEntity, + CoverEntityFeature, +) from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_DOMAIN, CONF_ENTITIES from homeassistant.core import HomeAssistant @@ -17,6 +22,7 @@ from .const import ( ADD_ENTITIES_CALLBACKS, CONF_DOMAIN_DATA, CONF_MOTOR, + CONF_POSITIONING_MODE, CONF_REVERSE_TIME, DOMAIN, ) @@ -115,7 +121,7 @@ class LcnOutputsCover(LcnEntity, CoverEntity): async def async_close_cover(self, **kwargs: Any) -> None: """Close the cover.""" state = pypck.lcn_defs.MotorStateModifier.DOWN - if not await self.device_connection.control_motors_outputs( + if not await self.device_connection.control_motor_outputs( state, self.reverse_time ): return @@ -126,7 +132,7 @@ class LcnOutputsCover(LcnEntity, CoverEntity): async def async_open_cover(self, **kwargs: Any) -> None: """Open the cover.""" state = pypck.lcn_defs.MotorStateModifier.UP - if not await self.device_connection.control_motors_outputs( + if not await self.device_connection.control_motor_outputs( state, self.reverse_time ): return @@ -138,7 +144,7 @@ class LcnOutputsCover(LcnEntity, CoverEntity): async def async_stop_cover(self, **kwargs: Any) -> None: """Stop the cover.""" state = pypck.lcn_defs.MotorStateModifier.STOP - if not await self.device_connection.control_motors_outputs(state): + if not await self.device_connection.control_motor_outputs(state): return self._attr_is_closing = False self._attr_is_opening = False @@ -176,11 +182,25 @@ class LcnRelayCover(LcnEntity, CoverEntity): _attr_is_closing = False _attr_is_opening = False _attr_assumed_state = True + _attr_supported_features = ( + CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP + ) + + positioning_mode: pypck.lcn_defs.MotorPositioningMode def __init__(self, config: ConfigType, config_entry: ConfigEntry) -> None: """Initialize the LCN cover.""" super().__init__(config, config_entry) + self.positioning_mode = pypck.lcn_defs.MotorPositioningMode( + config[CONF_DOMAIN_DATA].get( + CONF_POSITIONING_MODE, pypck.lcn_defs.MotorPositioningMode.NONE.value + ) + ) + + if self.positioning_mode != pypck.lcn_defs.MotorPositioningMode.NONE: + self._attr_supported_features |= CoverEntityFeature.SET_POSITION + self.motor = pypck.lcn_defs.MotorPort[config[CONF_DOMAIN_DATA][CONF_MOTOR]] self.motor_port_onoff = self.motor.value * 2 self.motor_port_updown = self.motor_port_onoff + 1 @@ -193,7 +213,9 @@ class LcnRelayCover(LcnEntity, CoverEntity): """Run when entity about to be added to hass.""" await super().async_added_to_hass() if not self.device_connection.is_group: - await self.device_connection.activate_status_request_handler(self.motor) + await self.device_connection.activate_status_request_handler( + self.motor, self.positioning_mode + ) async def async_will_remove_from_hass(self) -> None: """Run when entity will be removed from hass.""" @@ -203,9 +225,11 @@ class LcnRelayCover(LcnEntity, CoverEntity): async def async_close_cover(self, **kwargs: Any) -> None: """Close the cover.""" - states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4 - states[self.motor.value] = pypck.lcn_defs.MotorStateModifier.DOWN - if not await self.device_connection.control_motors_relays(states): + if not await self.device_connection.control_motor_relays( + self.motor.value, + pypck.lcn_defs.MotorStateModifier.DOWN, + self.positioning_mode, + ): return self._attr_is_opening = False self._attr_is_closing = True @@ -213,9 +237,11 @@ class LcnRelayCover(LcnEntity, CoverEntity): async def async_open_cover(self, **kwargs: Any) -> None: """Open the cover.""" - states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4 - states[self.motor.value] = pypck.lcn_defs.MotorStateModifier.UP - if not await self.device_connection.control_motors_relays(states): + if not await self.device_connection.control_motor_relays( + self.motor.value, + pypck.lcn_defs.MotorStateModifier.UP, + self.positioning_mode, + ): return self._attr_is_closed = False self._attr_is_opening = True @@ -224,26 +250,55 @@ class LcnRelayCover(LcnEntity, CoverEntity): async def async_stop_cover(self, **kwargs: Any) -> None: """Stop the cover.""" - states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4 - states[self.motor.value] = pypck.lcn_defs.MotorStateModifier.STOP - if not await self.device_connection.control_motors_relays(states): + if not await self.device_connection.control_motor_relays( + self.motor.value, + pypck.lcn_defs.MotorStateModifier.STOP, + self.positioning_mode, + ): return self._attr_is_closing = False self._attr_is_opening = False self.async_write_ha_state() - def input_received(self, input_obj: InputType) -> None: - """Set cover states when LCN input object (command) is received.""" - if not isinstance(input_obj, pypck.inputs.ModStatusRelays): + async def async_set_cover_position(self, **kwargs: Any) -> None: + """Move the cover to a specific position.""" + position = kwargs[ATTR_POSITION] + if not await self.device_connection.control_motor_relays_position( + self.motor.value, position, mode=self.positioning_mode + ): return - - states = input_obj.states # list of boolean values (relay on/off) - if states[self.motor_port_onoff]: # motor is on - self._attr_is_opening = not states[self.motor_port_updown] # set direction - self._attr_is_closing = states[self.motor_port_updown] # set direction - else: # motor is off - self._attr_is_opening = False - self._attr_is_closing = False - self._attr_is_closed = states[self.motor_port_updown] + self._attr_is_closed = (self._attr_current_cover_position == 0) & ( + position == 0 + ) + if self._attr_current_cover_position is not None: + self._attr_is_closing = self._attr_current_cover_position > position + self._attr_is_opening = self._attr_current_cover_position < position + self._attr_current_cover_position = position self.async_write_ha_state() + + def input_received(self, input_obj: InputType) -> None: + """Set cover states when LCN input object (command) is received.""" + if isinstance(input_obj, pypck.inputs.ModStatusRelays): + self._attr_is_opening = input_obj.is_opening(self.motor.value) + self._attr_is_closing = input_obj.is_closing(self.motor.value) + + if self.positioning_mode == pypck.lcn_defs.MotorPositioningMode.NONE: + self._attr_is_closed = input_obj.is_assumed_closed(self.motor.value) + self.async_write_ha_state() + elif ( + isinstance( + input_obj, + ( + pypck.inputs.ModStatusMotorPositionBS4, + pypck.inputs.ModStatusMotorPositionModule, + ), + ) + and input_obj.motor == self.motor.value + ): + self._attr_current_cover_position = input_obj.position + if self._attr_current_cover_position in [0, 100]: + self._attr_is_opening = False + self._attr_is_closing = False + self._attr_is_closed = self._attr_current_cover_position == 0 + self.async_write_ha_state() diff --git a/homeassistant/components/lcn/manifest.json b/homeassistant/components/lcn/manifest.json index e5313eee4f3..0031cbcc947 100644 --- a/homeassistant/components/lcn/manifest.json +++ b/homeassistant/components/lcn/manifest.json @@ -8,5 +8,5 @@ "documentation": "https://www.home-assistant.io/integrations/lcn", "iot_class": "local_push", "loggers": ["pypck"], - "requirements": ["pypck==0.8.5", "lcn-frontend==0.2.4"] + "requirements": ["pypck==0.8.6", "lcn-frontend==0.2.4"] } diff --git a/homeassistant/components/lcn/schemas.py b/homeassistant/components/lcn/schemas.py index d90e264692c..fcc6044dd77 100644 --- a/homeassistant/components/lcn/schemas.py +++ b/homeassistant/components/lcn/schemas.py @@ -21,6 +21,7 @@ from .const import ( CONF_MOTOR, CONF_OUTPUT, CONF_OUTPUTS, + CONF_POSITIONING_MODE, CONF_REGISTER, CONF_REVERSE_TIME, CONF_SETPOINT, @@ -30,7 +31,8 @@ from .const import ( LED_PORTS, LOGICOP_PORTS, MOTOR_PORTS, - MOTOR_REVERSE_TIME, + MOTOR_POSITIONING_MODES, + MOTOR_REVERSE_TIMES, OUTPUT_PORTS, RELAY_PORTS, S0_INPUTS, @@ -68,8 +70,11 @@ DOMAIN_DATA_CLIMATE: VolDictType = { DOMAIN_DATA_COVER: VolDictType = { vol.Required(CONF_MOTOR): vol.All(vol.Upper, vol.In(MOTOR_PORTS)), + vol.Optional(CONF_POSITIONING_MODE, default="none"): vol.All( + vol.Upper, vol.In(MOTOR_POSITIONING_MODES) + ), vol.Optional(CONF_REVERSE_TIME, default="rt1200"): vol.All( - vol.Upper, vol.In(MOTOR_REVERSE_TIME) + vol.Upper, vol.In(MOTOR_REVERSE_TIMES) ), } diff --git a/requirements_all.txt b/requirements_all.txt index 6eda282e955..787e1831914 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -2227,7 +2227,7 @@ pypalazzetti==0.1.19 pypca==0.0.7 # homeassistant.components.lcn -pypck==0.8.5 +pypck==0.8.6 # homeassistant.components.pglab pypglab==0.0.5 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index cb47548ebba..2fe65d0a93d 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -1821,7 +1821,7 @@ pyownet==0.10.0.post1 pypalazzetti==0.1.19 # homeassistant.components.lcn -pypck==0.8.5 +pypck==0.8.6 # homeassistant.components.pglab pypglab==0.0.5 diff --git a/tests/components/lcn/fixtures/config_entry_pchk.json b/tests/components/lcn/fixtures/config_entry_pchk.json index f319e37b265..5ded11d619a 100644 --- a/tests/components/lcn/fixtures/config_entry_pchk.json +++ b/tests/components/lcn/fixtures/config_entry_pchk.json @@ -125,7 +125,30 @@ "domain": "cover", "domain_data": { "motor": "MOTOR1", - "reverse_time": "RT1200" + "reverse_time": "RT1200", + "positioning_mode": "NONE" + } + }, + { + "address": [0, 7, false], + "name": "Cover_Relays_BS4", + "resource": "motor2", + "domain": "cover", + "domain_data": { + "motor": "MOTOR2", + "reverse_time": "RT1200", + "positioning_mode": "BS4" + } + }, + { + "address": [0, 7, false], + "name": "Cover_Relays_Module", + "resource": "motor3", + "domain": "cover", + "domain_data": { + "motor": "MOTOR3", + "reverse_time": "RT1200", + "positioning_mode": "MODULE" } }, { diff --git a/tests/components/lcn/snapshots/test_cover.ambr b/tests/components/lcn/snapshots/test_cover.ambr index 3e9c4ee72eb..4d1356e3c92 100644 --- a/tests/components/lcn/snapshots/test_cover.ambr +++ b/tests/components/lcn/snapshots/test_cover.ambr @@ -97,3 +97,101 @@ 'state': 'open', }) # --- +# name: test_setup_lcn_cover[cover.cover_relays_bs4-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': None, + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'cover', + 'entity_category': None, + 'entity_id': 'cover.cover_relays_bs4', + 'has_entity_name': False, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'Cover_Relays_BS4', + 'platform': 'lcn', + 'previous_unique_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': 'lcn/config_entry_pchk_json-m000007-motor2', + 'unit_of_measurement': None, + }) +# --- +# name: test_setup_lcn_cover[cover.cover_relays_bs4-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'assumed_state': True, + 'friendly_name': 'Cover_Relays_BS4', + 'supported_features': , + }), + 'context': , + 'entity_id': 'cover.cover_relays_bs4', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'open', + }) +# --- +# name: test_setup_lcn_cover[cover.cover_relays_module-entry] + EntityRegistryEntrySnapshot({ + 'aliases': set({ + }), + 'area_id': None, + 'capabilities': None, + 'config_entry_id': , + 'config_subentry_id': , + 'device_class': None, + 'device_id': , + 'disabled_by': None, + 'domain': 'cover', + 'entity_category': None, + 'entity_id': 'cover.cover_relays_module', + 'has_entity_name': False, + 'hidden_by': None, + 'icon': None, + 'id': , + 'labels': set({ + }), + 'name': None, + 'options': dict({ + }), + 'original_device_class': None, + 'original_icon': None, + 'original_name': 'Cover_Relays_Module', + 'platform': 'lcn', + 'previous_unique_id': None, + 'supported_features': , + 'translation_key': None, + 'unique_id': 'lcn/config_entry_pchk_json-m000007-motor3', + 'unit_of_measurement': None, + }) +# --- +# name: test_setup_lcn_cover[cover.cover_relays_module-state] + StateSnapshot({ + 'attributes': ReadOnlyDict({ + 'assumed_state': True, + 'friendly_name': 'Cover_Relays_Module', + 'supported_features': , + }), + 'context': , + 'entity_id': 'cover.cover_relays_module', + 'last_changed': , + 'last_reported': , + 'last_updated': , + 'state': 'open', + }) +# --- diff --git a/tests/components/lcn/test_cover.py b/tests/components/lcn/test_cover.py index ff4311b6687..f2dd71757c9 100644 --- a/tests/components/lcn/test_cover.py +++ b/tests/components/lcn/test_cover.py @@ -2,17 +2,29 @@ from unittest.mock import patch -from pypck.inputs import ModStatusOutput, ModStatusRelays +from pypck.inputs import ( + ModStatusMotorPositionBS4, + ModStatusMotorPositionModule, + ModStatusOutput, + ModStatusRelays, +) from pypck.lcn_addr import LcnAddr -from pypck.lcn_defs import MotorReverseTime, MotorStateModifier +from pypck.lcn_defs import MotorPositioningMode, MotorReverseTime, MotorStateModifier +import pytest from syrupy.assertion import SnapshotAssertion -from homeassistant.components.cover import DOMAIN as DOMAIN_COVER, CoverState +from homeassistant.components.cover import ( + ATTR_CURRENT_POSITION, + ATTR_POSITION, + DOMAIN as DOMAIN_COVER, + CoverState, +) from homeassistant.components.lcn.helpers import get_device_connection from homeassistant.const import ( ATTR_ENTITY_ID, SERVICE_CLOSE_COVER, SERVICE_OPEN_COVER, + SERVICE_SET_COVER_POSITION, SERVICE_STOP_COVER, STATE_UNAVAILABLE, Platform, @@ -26,6 +38,8 @@ from tests.common import snapshot_platform COVER_OUTPUTS = "cover.cover_outputs" COVER_RELAYS = "cover.cover_relays" +COVER_RELAYS_BS4 = "cover.cover_relays_bs4" +COVER_RELAYS_MODULE = "cover.cover_relays_MODULE" async def test_setup_lcn_cover( @@ -46,13 +60,13 @@ async def test_outputs_open(hass: HomeAssistant, entry: MockConfigEntry) -> None await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_outputs" - ) as control_motors_outputs: + MockModuleConnection, "control_motor_outputs" + ) as control_motor_outputs: state = hass.states.get(COVER_OUTPUTS) state.state = CoverState.CLOSED # command failed - control_motors_outputs.return_value = False + control_motor_outputs.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -61,7 +75,7 @@ async def test_outputs_open(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_outputs.assert_awaited_with( + control_motor_outputs.assert_awaited_with( MotorStateModifier.UP, MotorReverseTime.RT1200 ) @@ -70,8 +84,8 @@ async def test_outputs_open(hass: HomeAssistant, entry: MockConfigEntry) -> None assert state.state != CoverState.OPENING # command success - control_motors_outputs.reset_mock(return_value=True) - control_motors_outputs.return_value = True + control_motor_outputs.reset_mock(return_value=True) + control_motor_outputs.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -80,7 +94,7 @@ async def test_outputs_open(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_outputs.assert_awaited_with( + control_motor_outputs.assert_awaited_with( MotorStateModifier.UP, MotorReverseTime.RT1200 ) @@ -94,13 +108,13 @@ async def test_outputs_close(hass: HomeAssistant, entry: MockConfigEntry) -> Non await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_outputs" - ) as control_motors_outputs: + MockModuleConnection, "control_motor_outputs" + ) as control_motor_outputs: state = hass.states.get(COVER_OUTPUTS) state.state = CoverState.OPEN # command failed - control_motors_outputs.return_value = False + control_motor_outputs.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -109,7 +123,7 @@ async def test_outputs_close(hass: HomeAssistant, entry: MockConfigEntry) -> Non blocking=True, ) - control_motors_outputs.assert_awaited_with( + control_motor_outputs.assert_awaited_with( MotorStateModifier.DOWN, MotorReverseTime.RT1200 ) @@ -118,8 +132,8 @@ async def test_outputs_close(hass: HomeAssistant, entry: MockConfigEntry) -> Non assert state.state != CoverState.CLOSING # command success - control_motors_outputs.reset_mock(return_value=True) - control_motors_outputs.return_value = True + control_motor_outputs.reset_mock(return_value=True) + control_motor_outputs.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -128,7 +142,7 @@ async def test_outputs_close(hass: HomeAssistant, entry: MockConfigEntry) -> Non blocking=True, ) - control_motors_outputs.assert_awaited_with( + control_motor_outputs.assert_awaited_with( MotorStateModifier.DOWN, MotorReverseTime.RT1200 ) @@ -142,13 +156,13 @@ async def test_outputs_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_outputs" - ) as control_motors_outputs: + MockModuleConnection, "control_motor_outputs" + ) as control_motor_outputs: state = hass.states.get(COVER_OUTPUTS) state.state = CoverState.CLOSING # command failed - control_motors_outputs.return_value = False + control_motor_outputs.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -157,15 +171,15 @@ async def test_outputs_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_outputs.assert_awaited_with(MotorStateModifier.STOP) + control_motor_outputs.assert_awaited_with(MotorStateModifier.STOP) state = hass.states.get(COVER_OUTPUTS) assert state is not None assert state.state == CoverState.CLOSING # command success - control_motors_outputs.reset_mock(return_value=True) - control_motors_outputs.return_value = True + control_motor_outputs.reset_mock(return_value=True) + control_motor_outputs.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -174,7 +188,7 @@ async def test_outputs_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_outputs.assert_awaited_with(MotorStateModifier.STOP) + control_motor_outputs.assert_awaited_with(MotorStateModifier.STOP) state = hass.states.get(COVER_OUTPUTS) assert state is not None @@ -186,16 +200,13 @@ async def test_relays_open(hass: HomeAssistant, entry: MockConfigEntry) -> None: await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_relays" - ) as control_motors_relays: - states = [MotorStateModifier.NOCHANGE] * 4 - states[0] = MotorStateModifier.UP - + MockModuleConnection, "control_motor_relays" + ) as control_motor_relays: state = hass.states.get(COVER_RELAYS) state.state = CoverState.CLOSED # command failed - control_motors_relays.return_value = False + control_motor_relays.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -204,15 +215,17 @@ async def test_relays_open(hass: HomeAssistant, entry: MockConfigEntry) -> None: blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.UP, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None assert state.state != CoverState.OPENING # command success - control_motors_relays.reset_mock(return_value=True) - control_motors_relays.return_value = True + control_motor_relays.reset_mock(return_value=True) + control_motor_relays.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -221,7 +234,9 @@ async def test_relays_open(hass: HomeAssistant, entry: MockConfigEntry) -> None: blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.UP, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None @@ -233,16 +248,13 @@ async def test_relays_close(hass: HomeAssistant, entry: MockConfigEntry) -> None await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_relays" - ) as control_motors_relays: - states = [MotorStateModifier.NOCHANGE] * 4 - states[0] = MotorStateModifier.DOWN - + MockModuleConnection, "control_motor_relays" + ) as control_motor_relays: state = hass.states.get(COVER_RELAYS) state.state = CoverState.OPEN # command failed - control_motors_relays.return_value = False + control_motor_relays.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -251,15 +263,17 @@ async def test_relays_close(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.DOWN, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None assert state.state != CoverState.CLOSING # command success - control_motors_relays.reset_mock(return_value=True) - control_motors_relays.return_value = True + control_motor_relays.reset_mock(return_value=True) + control_motor_relays.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -268,7 +282,9 @@ async def test_relays_close(hass: HomeAssistant, entry: MockConfigEntry) -> None blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.DOWN, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None @@ -280,16 +296,13 @@ async def test_relays_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None: await init_integration(hass, entry) with patch.object( - MockModuleConnection, "control_motors_relays" - ) as control_motors_relays: - states = [MotorStateModifier.NOCHANGE] * 4 - states[0] = MotorStateModifier.STOP - + MockModuleConnection, "control_motor_relays" + ) as control_motor_relays: state = hass.states.get(COVER_RELAYS) state.state = CoverState.CLOSING # command failed - control_motors_relays.return_value = False + control_motor_relays.return_value = False await hass.services.async_call( DOMAIN_COVER, @@ -298,15 +311,17 @@ async def test_relays_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None: blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.STOP, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None assert state.state == CoverState.CLOSING # command success - control_motors_relays.reset_mock(return_value=True) - control_motors_relays.return_value = True + control_motor_relays.reset_mock(return_value=True) + control_motor_relays.return_value = True await hass.services.async_call( DOMAIN_COVER, @@ -315,13 +330,74 @@ async def test_relays_stop(hass: HomeAssistant, entry: MockConfigEntry) -> None: blocking=True, ) - control_motors_relays.assert_awaited_with(states) + control_motor_relays.assert_awaited_with( + 0, MotorStateModifier.STOP, MotorPositioningMode.NONE + ) state = hass.states.get(COVER_RELAYS) assert state is not None assert state.state not in (CoverState.CLOSING, CoverState.OPENING) +@pytest.mark.parametrize( + ("entity_id", "motor", "positioning_mode"), + [ + (COVER_RELAYS_BS4, 1, MotorPositioningMode.BS4), + (COVER_RELAYS_MODULE, 2, MotorPositioningMode.MODULE), + ], +) +async def test_relays_set_position( + hass: HomeAssistant, + entry: MockConfigEntry, + entity_id: str, + motor: int, + positioning_mode: MotorPositioningMode, +) -> None: + """Test the relays cover moves to position.""" + await init_integration(hass, entry) + + with patch.object( + MockModuleConnection, "control_motor_relays_position" + ) as control_motor_relays_position: + state = hass.states.get(entity_id) + state.state = CoverState.CLOSED + + # command failed + control_motor_relays_position.return_value = False + + await hass.services.async_call( + DOMAIN_COVER, + SERVICE_SET_COVER_POSITION, + {ATTR_ENTITY_ID: entity_id, ATTR_POSITION: 50}, + blocking=True, + ) + + control_motor_relays_position.assert_awaited_with( + motor, 50, mode=positioning_mode + ) + + state = hass.states.get(entity_id) + assert state.state == CoverState.CLOSED + + # command success + control_motor_relays_position.reset_mock(return_value=True) + control_motor_relays_position.return_value = True + + await hass.services.async_call( + DOMAIN_COVER, + SERVICE_SET_COVER_POSITION, + {ATTR_ENTITY_ID: entity_id, ATTR_POSITION: 50}, + blocking=True, + ) + + control_motor_relays_position.assert_awaited_with( + motor, 50, mode=positioning_mode + ) + + state = hass.states.get(entity_id) + assert state.state == CoverState.OPEN + + async def test_pushed_outputs_status_change( hass: HomeAssistant, entry: MockConfigEntry ) -> None: @@ -372,8 +448,9 @@ async def test_pushed_relays_status_change( address = LcnAddr(0, 7, False) states = [False] * 8 - state = hass.states.get(COVER_RELAYS) - state.state = CoverState.CLOSED + for entity_id in (COVER_RELAYS, COVER_RELAYS_BS4, COVER_RELAYS_MODULE): + state = hass.states.get(entity_id) + state.state = CoverState.CLOSED # push status "open" states[0:2] = [True, False] @@ -405,6 +482,26 @@ async def test_pushed_relays_status_change( assert state is not None assert state.state == CoverState.CLOSING + # push status "set position" via BS4 + inp = ModStatusMotorPositionBS4(address, 1, 50) + await device_connection.async_process_input(inp) + await hass.async_block_till_done() + + state = hass.states.get(COVER_RELAYS_BS4) + assert state is not None + assert state.state == CoverState.OPEN + assert state.attributes[ATTR_CURRENT_POSITION] == 50 + + # push status "set position" via MODULE + inp = ModStatusMotorPositionModule(address, 2, 75) + await device_connection.async_process_input(inp) + await hass.async_block_till_done() + + state = hass.states.get(COVER_RELAYS_MODULE) + assert state is not None + assert state.state == CoverState.OPEN + assert state.attributes[ATTR_CURRENT_POSITION] == 75 + async def test_unload_config_entry(hass: HomeAssistant, entry: MockConfigEntry) -> None: """Test the cover is removed when the config entry is unloaded."""