mirror of
				https://github.com/home-assistant/core.git
				synced 2025-11-04 08:29:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			169 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			169 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Support for RFXtrx covers."""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
import logging
 | 
						|
from typing import Any
 | 
						|
 | 
						|
import RFXtrx as rfxtrxmod
 | 
						|
 | 
						|
from homeassistant.components.cover import CoverEntity, CoverEntityFeature
 | 
						|
from homeassistant.config_entries import ConfigEntry
 | 
						|
from homeassistant.const import STATE_OPEN
 | 
						|
from homeassistant.core import HomeAssistant, callback
 | 
						|
from homeassistant.helpers.entity import Entity
 | 
						|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
 | 
						|
 | 
						|
from . import DeviceTuple, RfxtrxCommandEntity, async_setup_platform_entry
 | 
						|
from .const import (
 | 
						|
    COMMAND_OFF_LIST,
 | 
						|
    COMMAND_ON_LIST,
 | 
						|
    CONF_VENETIAN_BLIND_MODE,
 | 
						|
    CONST_VENETIAN_BLIND_MODE_EU,
 | 
						|
    CONST_VENETIAN_BLIND_MODE_US,
 | 
						|
)
 | 
						|
 | 
						|
_LOGGER = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def supported(event: rfxtrxmod.RFXtrxEvent) -> bool:
 | 
						|
    """Return whether an event supports cover."""
 | 
						|
    return bool(event.device.known_to_be_rollershutter)
 | 
						|
 | 
						|
 | 
						|
async def async_setup_entry(
 | 
						|
    hass: HomeAssistant,
 | 
						|
    config_entry: ConfigEntry,
 | 
						|
    async_add_entities: AddEntitiesCallback,
 | 
						|
) -> None:
 | 
						|
    """Set up config entry."""
 | 
						|
 | 
						|
    def _constructor(
 | 
						|
        event: rfxtrxmod.RFXtrxEvent,
 | 
						|
        auto: rfxtrxmod.RFXtrxEvent | None,
 | 
						|
        device_id: DeviceTuple,
 | 
						|
        entity_info: dict[str, Any],
 | 
						|
    ) -> list[Entity]:
 | 
						|
        return [
 | 
						|
            RfxtrxCover(
 | 
						|
                event.device,
 | 
						|
                device_id,
 | 
						|
                venetian_blind_mode=entity_info.get(CONF_VENETIAN_BLIND_MODE),
 | 
						|
                event=event if auto else None,
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
    await async_setup_platform_entry(
 | 
						|
        hass, config_entry, async_add_entities, supported, _constructor
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
 | 
						|
    """Representation of a RFXtrx cover."""
 | 
						|
 | 
						|
    _device: rfxtrxmod.RollerTrolDevice | rfxtrxmod.RfyDevice | rfxtrxmod.LightingDevice
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        device: rfxtrxmod.RFXtrxDevice,
 | 
						|
        device_id: DeviceTuple,
 | 
						|
        event: rfxtrxmod.RFXtrxEvent = None,
 | 
						|
        venetian_blind_mode: str | None = None,
 | 
						|
    ) -> None:
 | 
						|
        """Initialize the RFXtrx cover device."""
 | 
						|
        super().__init__(device, device_id, event)
 | 
						|
        self._venetian_blind_mode = venetian_blind_mode
 | 
						|
        self._attr_is_closed: bool | None = True
 | 
						|
 | 
						|
        self._attr_supported_features = (
 | 
						|
            CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE | CoverEntityFeature.STOP
 | 
						|
        )
 | 
						|
 | 
						|
        if venetian_blind_mode in (
 | 
						|
            CONST_VENETIAN_BLIND_MODE_US,
 | 
						|
            CONST_VENETIAN_BLIND_MODE_EU,
 | 
						|
        ):
 | 
						|
            self._attr_supported_features |= (
 | 
						|
                CoverEntityFeature.OPEN_TILT
 | 
						|
                | CoverEntityFeature.CLOSE_TILT
 | 
						|
                | CoverEntityFeature.STOP_TILT
 | 
						|
            )
 | 
						|
 | 
						|
    async def async_added_to_hass(self) -> None:
 | 
						|
        """Restore device state."""
 | 
						|
        await super().async_added_to_hass()
 | 
						|
 | 
						|
        if self._event is None:
 | 
						|
            old_state = await self.async_get_last_state()
 | 
						|
            if old_state is not None:
 | 
						|
                self._attr_is_closed = old_state.state != STATE_OPEN
 | 
						|
 | 
						|
    async def async_open_cover(self, **kwargs: Any) -> None:
 | 
						|
        """Move the cover up."""
 | 
						|
        if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
 | 
						|
            await self._async_send(self._device.send_up05sec)
 | 
						|
        elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
 | 
						|
            await self._async_send(self._device.send_up2sec)
 | 
						|
        else:
 | 
						|
            await self._async_send(self._device.send_open)
 | 
						|
        self._attr_is_closed = False
 | 
						|
        self.async_write_ha_state()
 | 
						|
 | 
						|
    async def async_close_cover(self, **kwargs: Any) -> None:
 | 
						|
        """Move the cover down."""
 | 
						|
        if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
 | 
						|
            await self._async_send(self._device.send_down05sec)
 | 
						|
        elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
 | 
						|
            await self._async_send(self._device.send_down2sec)
 | 
						|
        else:
 | 
						|
            await self._async_send(self._device.send_close)
 | 
						|
        self._attr_is_closed = True
 | 
						|
        self.async_write_ha_state()
 | 
						|
 | 
						|
    async def async_stop_cover(self, **kwargs: Any) -> None:
 | 
						|
        """Stop the cover."""
 | 
						|
        await self._async_send(self._device.send_stop)
 | 
						|
        self._attr_is_closed = False
 | 
						|
        self.async_write_ha_state()
 | 
						|
 | 
						|
    async def async_open_cover_tilt(self, **kwargs: Any) -> None:
 | 
						|
        """Tilt the cover up."""
 | 
						|
        if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
 | 
						|
            await self._async_send(self._device.send_up2sec)
 | 
						|
        elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
 | 
						|
            await self._async_send(self._device.send_up05sec)
 | 
						|
 | 
						|
    async def async_close_cover_tilt(self, **kwargs: Any) -> None:
 | 
						|
        """Tilt the cover down."""
 | 
						|
        if self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_US:
 | 
						|
            await self._async_send(self._device.send_down2sec)
 | 
						|
        elif self._venetian_blind_mode == CONST_VENETIAN_BLIND_MODE_EU:
 | 
						|
            await self._async_send(self._device.send_down05sec)
 | 
						|
 | 
						|
    async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
 | 
						|
        """Stop the cover tilt."""
 | 
						|
        await self._async_send(self._device.send_stop)
 | 
						|
        self._attr_is_closed = False
 | 
						|
        self.async_write_ha_state()
 | 
						|
 | 
						|
    def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
 | 
						|
        """Apply command from rfxtrx."""
 | 
						|
        assert isinstance(event, rfxtrxmod.ControlEvent)
 | 
						|
        super()._apply_event(event)
 | 
						|
        if event.values["Command"] in COMMAND_ON_LIST:
 | 
						|
            self._attr_is_closed = False
 | 
						|
        elif event.values["Command"] in COMMAND_OFF_LIST:
 | 
						|
            self._attr_is_closed = True
 | 
						|
 | 
						|
    @callback
 | 
						|
    def _handle_event(
 | 
						|
        self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
 | 
						|
    ) -> None:
 | 
						|
        """Check if event applies to me and update."""
 | 
						|
        if device_id != self._device_id:
 | 
						|
            return
 | 
						|
 | 
						|
        self._apply_event(event)
 | 
						|
 | 
						|
        self.async_write_ha_state()
 |