Compare commits

..

22 Commits

Author SHA1 Message Date
Erik Montnemery
3dabfeb329 Add more test helpers for trigger tests (#156464) 2025-11-12 19:33:59 +01:00
Erik Montnemery
8e7d2d7108 Add test helper for trigger tests (#156462) 2025-11-12 17:12:32 +01:00
Erik Montnemery
2fe4a1164b Add climate turned off trigger (#156453) 2025-11-12 16:48:10 +01:00
Erik Montnemery
05175294f6 Check from state in light triggers (#156443) 2025-11-12 13:15:30 +01:00
Erik Montnemery
e2ddfb8782 Add light trigger tests where initial state is none, unknown or unavailable (#156430) 2025-11-12 11:54:15 +01:00
Erik Montnemery
f1cc133ff6 Improve light trigger tests (#156424) 2025-11-12 09:12:41 +01:00
Erik Montnemery
0cf97cf577 Improve light trigger tests (#156378) 2025-11-11 21:15:26 +01:00
Franck Nijhof
38cea2e5f0 Clean up CONF_OPTIONS constant from light triggers (#156375) 2025-11-11 17:49:45 +01:00
Erik Montnemery
71876d5b34 Fix light trigger descriptions (#156380) 2025-11-11 17:47:48 +01:00
Abílio Costa
0f780254e1 Add light state condition (#149830)
Co-authored-by: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
2025-11-11 13:46:43 +01:00
Abílio Costa
9e40972b11 Split light state trigger (#156272)
Co-authored-by: Erik <erik@montnemery.com>
2025-11-11 13:42:40 +01:00
abmantis
07ef61dd8d Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-11-10 14:19:18 +00:00
abmantis
1bf6771a54 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-11-06 19:57:40 +00:00
abmantis
e7a7cb829e Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-11-04 12:28:39 +00:00
abmantis
6f6b2f1ad3 Merge branch 'dev_target_triggers_conditions' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-15 17:03:28 +01:00
abmantis
1cc4890f75 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-15 17:03:18 +01:00
Bram Kragten
d3dd9b26c9 Fixes for triggers.yaml descriptions (#153841)
Co-authored-by: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
2025-10-09 18:00:56 +01:00
Abílio Costa
a64d61df05 Fix light trigger with new Trigger class changes (#154087) 2025-10-09 18:14:55 +02:00
abmantis
e7c6c5311d Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-10-09 15:55:39 +01:00
abmantis
72a524c868 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-09-29 16:56:23 +01:00
abmantis
b437113f31 Merge branch 'dev' of github.com:home-assistant/core into dev_target_triggers_conditions 2025-09-29 11:18:39 +01:00
Abílio Costa
e0e263d3b5 Add state trigger to light component (#148416)
Co-authored-by: Artur Pragacz <49985303+arturpragacz@users.noreply.github.com>
2025-09-18 19:53:26 +01:00
278 changed files with 2913 additions and 18141 deletions

View File

@@ -622,7 +622,7 @@ jobs:
steps:
- *checkout
- name: Dependency review
uses: actions/dependency-review-action@3c4e3dcb1aa7874d2c16be7d79418e9b7efd6261 # v4.8.2
uses: actions/dependency-review-action@40c09b7dc99638e5ddb0bfd91c1673effc064d8a # v4.8.1
with:
license-check: false # We use our own license audit checks

4
CODEOWNERS generated
View File

@@ -1017,8 +1017,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/msteams/ @peroyvind
/homeassistant/components/mullvad/ @meichthys
/tests/components/mullvad/ @meichthys
/homeassistant/components/music_assistant/ @music-assistant @arturpragacz
/tests/components/music_assistant/ @music-assistant @arturpragacz
/homeassistant/components/music_assistant/ @music-assistant
/tests/components/music_assistant/ @music-assistant
/homeassistant/components/mutesync/ @currentoor
/tests/components/mutesync/ @currentoor
/homeassistant/components/my/ @home-assistant/core

View File

@@ -143,28 +143,5 @@
"name": "Trigger"
}
},
"title": "Alarm control panel",
"triggers": {
"armed": {
"description": "Triggers when an alarm is armed.",
"description_configured": "Triggers when an alarm is armed",
"fields": {
"mode": {
"description": "The arm modes to trigger on. If empty, triggers on all arm modes.",
"name": "Arm modes"
}
},
"name": "When an alarm is armed"
},
"disarmed": {
"description": "Triggers when an alarm is disarmed.",
"description_configured": "Triggers when an alarm is disarmed",
"name": "When an alarm is disarmed"
},
"triggered": {
"description": "Triggers when an alarm is triggered.",
"description_configured": "Triggers when an alarm is triggered",
"name": "When an alarm is triggered"
}
}
"title": "Alarm control panel"
}

View File

@@ -1,283 +0,0 @@
"""Provides triggers for alarm control panels."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_OPTIONS,
CONF_TARGET,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, AlarmControlPanelState
CONF_MODE = "mode"
ARMED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(CONF_MODE, default=[]): vol.All(
cv.ensure_list,
[
vol.In(
[
AlarmControlPanelState.ARMED_HOME,
AlarmControlPanelState.ARMED_AWAY,
AlarmControlPanelState.ARMED_NIGHT,
AlarmControlPanelState.ARMED_VACATION,
AlarmControlPanelState.ARMED_CUSTOM_BYPASS,
]
)
],
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
DISARMED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
TRIGGERED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class AlarmArmedTrigger(Trigger):
"""Trigger for when an alarm control panel is armed."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, ARMED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the alarm armed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
mode_filter = self._options[CONF_MODE]
# All armed states
armed_states = {
AlarmControlPanelState.ARMED_HOME,
AlarmControlPanelState.ARMED_AWAY,
AlarmControlPanelState.ARMED_NIGHT,
AlarmControlPanelState.ARMED_VACATION,
AlarmControlPanelState.ARMED_CUSTOM_BYPASS,
}
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if the new state is an armed state
if to_state.state not in armed_states:
return
# If mode filter is specified, check if the mode matches
if mode_filter and to_state.state not in mode_filter:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"alarm armed on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class AlarmDisarmedTrigger(Trigger):
"""Trigger for when an alarm control panel is disarmed."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, DISARMED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the alarm disarmed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if the new state is disarmed
if to_state.state != AlarmControlPanelState.DISARMED:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"alarm disarmed on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class AlarmTriggeredTrigger(Trigger):
"""Trigger for when an alarm control panel is triggered."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, TRIGGERED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the alarm triggered trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if the new state is triggered
if to_state.state != AlarmControlPanelState.TRIGGERED:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"alarm triggered on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
TRIGGERS: dict[str, type[Trigger]] = {
"armed": AlarmArmedTrigger,
"disarmed": AlarmDisarmedTrigger,
"triggered": AlarmTriggeredTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for alarm control panels."""
return TRIGGERS

View File

@@ -1,30 +0,0 @@
armed:
target:
entity:
domain: alarm_control_panel
fields:
mode:
required: false
default: []
selector:
select:
multiple: true
options:
- value: armed_home
label: Home
- value: armed_away
label: Away
- value: armed_night
label: Night
- value: armed_vacation
label: Vacation
- value: armed_custom_bypass
label: Custom bypass
disarmed:
target:
entity:
domain: alarm_control_panel
triggered:
target:
entity:
domain: alarm_control_panel

View File

@@ -98,27 +98,5 @@
"name": "Start conversation"
}
},
"title": "Assist satellite",
"triggers": {
"listening": {
"description": "Triggers when a satellite starts listening for a command.",
"description_configured": "Triggers when a satellite starts listening for a command",
"name": "When a satellite starts listening"
},
"processing": {
"description": "Triggers when a satellite starts processing a command.",
"description_configured": "Triggers when a satellite starts processing a command",
"name": "When a satellite starts processing"
},
"responding": {
"description": "Triggers when a satellite starts responding to a command.",
"description_configured": "Triggers when a satellite starts responding to a command",
"name": "When a satellite starts responding"
},
"idle": {
"description": "Triggers when a satellite goes back to idle.",
"description_configured": "Triggers when a satellite goes back to idle",
"name": "When a satellite goes back to idle"
}
}
"title": "Assist satellite"
}

View File

@@ -1,140 +0,0 @@
"""Provides triggers for assist satellites."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_TARGET,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.event import process_state_match
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
STATE_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class StateTriggerBase(Trigger):
"""Trigger for assist satellite state changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, STATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig, state: str) -> None:
"""Initialize the state trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
self._state = state
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
match_config_state = process_state_match(self._state)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if the new state matches the trigger state
if not match_config_state(to_state.state):
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"{entity_id} {self._state}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ListeningTrigger(StateTriggerBase):
"""Trigger for when a satellite starts listening."""
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the listening trigger."""
super().__init__(hass, config, "listening")
class ProcessingTrigger(StateTriggerBase):
"""Trigger for when a satellite starts processing."""
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the processing trigger."""
super().__init__(hass, config, "processing")
class RespondingTrigger(StateTriggerBase):
"""Trigger for when a satellite starts responding."""
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the responding trigger."""
super().__init__(hass, config, "responding")
class IdleTrigger(StateTriggerBase):
"""Trigger for when a satellite goes back to idle."""
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the idle trigger."""
super().__init__(hass, config, "idle")
TRIGGERS: dict[str, type[Trigger]] = {
"listening": ListeningTrigger,
"processing": ProcessingTrigger,
"responding": RespondingTrigger,
"idle": IdleTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for assist satellites."""
return TRIGGERS

View File

@@ -1,19 +0,0 @@
listening:
target:
entity:
domain: assist_satellite
processing:
target:
entity:
domain: assist_satellite
responding:
target:
entity:
domain: assist_satellite
idle:
target:
entity:
domain: assist_satellite

View File

@@ -96,5 +96,10 @@
"turn_on": {
"service": "mdi:power-on"
}
},
"triggers": {
"turned_off": {
"trigger": "mdi:power-off"
}
}
}

View File

@@ -187,6 +187,13 @@
"heat_cool": "Heat/cool",
"off": "[%key:common::state::off%]"
}
},
"trigger_behavior": {
"options": {
"any": "Any",
"first": "First",
"last": "Last"
}
}
},
"services": {
@@ -287,91 +294,16 @@
},
"title": "Climate",
"triggers": {
"cooling": {
"description": "Triggers when a climate starts cooling.",
"name": "When a climate starts cooling"
},
"current_humidity_changed": {
"description": "Triggers when the current humidity of a climate changes.",
"turned_off": {
"description": "Triggers when a climate is turned off.",
"description_configured": "Triggers when a climate is turned off",
"fields": {
"above": {
"description": "Only trigger when the current humidity goes above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when the current humidity goes below this value.",
"name": "Below"
"behavior": {
"description": "The behavior of the targeted climates to trigger on.",
"name": "Behavior"
}
},
"name": "When current humidity changes"
},
"current_temperature_changed": {
"description": "Triggers when the current temperature of a climate changes.",
"fields": {
"above": {
"description": "Only trigger when the current temperature goes above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when the current temperature goes below this value.",
"name": "Below"
}
},
"name": "When current temperature changes"
},
"drying": {
"description": "Triggers when a climate starts drying.",
"name": "When a climate starts drying"
},
"heating": {
"description": "Triggers when a climate starts heating.",
"name": "When a climate starts heating"
},
"mode_changed": {
"description": "Triggers when the HVAC mode of a climate changes.",
"fields": {
"hvac_mode": {
"description": "The HVAC modes to trigger on. If empty, triggers on all mode changes.",
"name": "HVAC modes"
}
},
"name": "When HVAC mode changes"
},
"target_humidity_changed": {
"description": "Triggers when the target humidity of a climate changes.",
"fields": {
"above": {
"description": "Only trigger when the target humidity goes above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when the target humidity goes below this value.",
"name": "Below"
}
},
"name": "When target humidity changes"
},
"target_temperature_changed": {
"description": "Triggers when the target temperature of a climate changes.",
"fields": {
"above": {
"description": "Only trigger when the target temperature goes above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when the target temperature goes below this value.",
"name": "Below"
}
},
"name": "When target temperature changes"
},
"turns_off": {
"description": "Triggers when a climate turns off.",
"name": "When a climate turns off"
},
"turns_on": {
"description": "Triggers when a climate turns on.",
"name": "When a climate turns on"
"name": "When a climate is turned off"
}
}
}

View File

@@ -1,817 +1,16 @@
"""Provides triggers for climate."""
"""Provides triggers for climates."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_TEMPERATURE,
CONF_ABOVE,
CONF_BELOW,
CONF_OPTIONS,
CONF_TARGET,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from .const import (
ATTR_CURRENT_HUMIDITY,
ATTR_CURRENT_TEMPERATURE,
ATTR_HUMIDITY,
ATTR_HVAC_ACTION,
ATTR_HVAC_MODE,
DOMAIN,
HVAC_MODES,
HVACMode,
)
CLIMATE_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
MODE_CHANGED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(ATTR_HVAC_MODE, default=[]): vol.All(
cv.ensure_list, [vol.In(HVAC_MODES)]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
THRESHOLD_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(CONF_ABOVE): vol.Coerce(float),
vol.Optional(CONF_BELOW): vol.Coerce(float),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class ClimateTurnsOnTrigger(Trigger):
"""Trigger for when a climate turns on."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLIMATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate turns on trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if climate turned on (from off to any other mode)
if (
from_state is not None
and from_state.state == HVACMode.OFF
and to_state.state != HVACMode.OFF
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} turned on",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateTurnsOffTrigger(Trigger):
"""Trigger for when a climate turns off."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLIMATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate turns off trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if climate turned off (from any mode to off)
if (
from_state is not None
and from_state.state != HVACMode.OFF
and to_state.state == HVACMode.OFF
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} turned off",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateModeChangedTrigger(Trigger):
"""Trigger for when a climate mode changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, MODE_CHANGED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate mode changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
hvac_modes_filter = self._options[ATTR_HVAC_MODE]
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if hvac_mode changed
if from_state is not None and from_state.state != to_state.state:
# If hvac_modes filter is specified, check if the new mode matches
if hvac_modes_filter and to_state.state not in hvac_modes_filter:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} mode changed to {to_state.state}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateCoolingTrigger(Trigger):
"""Trigger for when a climate starts cooling."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLIMATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate cooling trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if climate started cooling
from_action = from_state.attributes.get(ATTR_HVAC_ACTION) if from_state else None
to_action = to_state.attributes.get(ATTR_HVAC_ACTION)
if from_action != "cooling" and to_action == "cooling":
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} started cooling",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateHeatingTrigger(Trigger):
"""Trigger for when a climate starts heating."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLIMATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate heating trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if climate started heating
from_action = from_state.attributes.get(ATTR_HVAC_ACTION) if from_state else None
to_action = to_state.attributes.get(ATTR_HVAC_ACTION)
if from_action != "heating" and to_action == "heating":
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} started heating",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateDryingTrigger(Trigger):
"""Trigger for when a climate starts drying."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLIMATE_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate drying trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if climate started drying
from_action = from_state.attributes.get(ATTR_HVAC_ACTION) if from_state else None
to_action = to_state.attributes.get(ATTR_HVAC_ACTION)
if from_action != "drying" and to_action == "drying":
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} started drying",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateTargetTemperatureChangedTrigger(Trigger):
"""Trigger for when a climate target temperature changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, THRESHOLD_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate target temperature changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
above = self._options.get(CONF_ABOVE)
below = self._options.get(CONF_BELOW)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if target temperature changed
from_temp = (
from_state.attributes.get(ATTR_TEMPERATURE) if from_state else None
)
to_temp = to_state.attributes.get(ATTR_TEMPERATURE)
if to_temp is None or from_temp == to_temp:
return
# Apply threshold filters if specified
if above is not None and to_temp <= above:
return
if below is not None and to_temp >= below:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} target temperature changed to {to_temp}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateCurrentTemperatureChangedTrigger(Trigger):
"""Trigger for when a climate current temperature changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, THRESHOLD_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate current temperature changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
above = self._options.get(CONF_ABOVE)
below = self._options.get(CONF_BELOW)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if current temperature changed
from_temp = (
from_state.attributes.get(ATTR_CURRENT_TEMPERATURE)
if from_state
else None
)
to_temp = to_state.attributes.get(ATTR_CURRENT_TEMPERATURE)
if to_temp is None or from_temp == to_temp:
return
# Apply threshold filters if specified
if above is not None and to_temp <= above:
return
if below is not None and to_temp >= below:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} current temperature changed to {to_temp}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateTargetHumidityChangedTrigger(Trigger):
"""Trigger for when a climate target humidity changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, THRESHOLD_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate target humidity changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
above = self._options.get(CONF_ABOVE)
below = self._options.get(CONF_BELOW)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if target humidity changed
from_humidity = (
from_state.attributes.get(ATTR_HUMIDITY) if from_state else None
)
to_humidity = to_state.attributes.get(ATTR_HUMIDITY)
if to_humidity is None or from_humidity == to_humidity:
return
# Apply threshold filters if specified
if above is not None and to_humidity <= above:
return
if below is not None and to_humidity >= below:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} target humidity changed to {to_humidity}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class ClimateCurrentHumidityChangedTrigger(Trigger):
"""Trigger for when a climate current humidity changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, THRESHOLD_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the climate current humidity changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
above = self._options.get(CONF_ABOVE)
below = self._options.get(CONF_BELOW)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Check if current humidity changed
from_humidity = (
from_state.attributes.get(ATTR_CURRENT_HUMIDITY)
if from_state
else None
)
to_humidity = to_state.attributes.get(ATTR_CURRENT_HUMIDITY)
if to_humidity is None or from_humidity == to_humidity:
return
# Apply threshold filters if specified
if above is not None and to_humidity <= above:
return
if below is not None and to_humidity >= below:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"climate {entity_id} current humidity changed to {to_humidity}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
from homeassistant.const import STATE_OFF
from homeassistant.core import HomeAssistant
from homeassistant.helpers.trigger import Trigger, make_entity_state_trigger
from .const import DOMAIN
TRIGGERS: dict[str, type[Trigger]] = {
"turns_on": ClimateTurnsOnTrigger,
"turns_off": ClimateTurnsOffTrigger,
"mode_changed": ClimateModeChangedTrigger,
"cooling": ClimateCoolingTrigger,
"heating": ClimateHeatingTrigger,
"drying": ClimateDryingTrigger,
"target_temperature_changed": ClimateTargetTemperatureChangedTrigger,
"current_temperature_changed": ClimateCurrentTemperatureChangedTrigger,
"target_humidity_changed": ClimateTargetHumidityChangedTrigger,
"current_humidity_changed": ClimateCurrentHumidityChangedTrigger,
"turned_off": make_entity_state_trigger(DOMAIN, STATE_OFF),
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for climate."""
"""Return the triggers for climates."""
return TRIGGERS

View File

@@ -1,128 +1,15 @@
turns_on:
target:
entity:
domain: climate
turns_off:
target:
entity:
domain: climate
mode_changed:
turned_off:
target:
entity:
domain: climate
fields:
hvac_mode:
required: false
default: []
behavior:
required: true
default: any
selector:
select:
multiple: true
mode: dropdown
translation_key: trigger_behavior
options:
- label: "Off"
value: "off"
- label: "Heat"
value: "heat"
- label: "Cool"
value: "cool"
- label: "Heat/Cool"
value: "heat_cool"
- label: "Auto"
value: "auto"
- label: "Dry"
value: "dry"
- label: "Fan only"
value: "fan_only"
cooling:
target:
entity:
domain: climate
heating:
target:
entity:
domain: climate
drying:
target:
entity:
domain: climate
target_temperature_changed:
target:
entity:
domain: climate
fields:
above:
required: false
selector:
number:
mode: box
step: 0.1
below:
required: false
selector:
number:
mode: box
step: 0.1
current_temperature_changed:
target:
entity:
domain: climate
fields:
above:
required: false
selector:
number:
mode: box
step: 0.1
below:
required: false
selector:
number:
mode: box
step: 0.1
target_humidity_changed:
target:
entity:
domain: climate
fields:
above:
required: false
selector:
number:
mode: box
min: 0
max: 100
below:
required: false
selector:
number:
mode: box
min: 0
max: 100
current_humidity_changed:
target:
entity:
domain: climate
fields:
above:
required: false
selector:
number:
mode: box
min: 0
max: 100
below:
required: false
selector:
number:
mode: box
min: 0
max: 100
- first
- last
- any

View File

@@ -37,6 +37,13 @@ USER_SCHEMA = vol.Schema(
}
)
STEP_REAUTH_DATA_SCHEMA = vol.Schema({vol.Required(CONF_PIN): cv.string})
STEP_RECONFIGURE = vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Optional(CONF_PIN, default=DEFAULT_PIN): cv.string,
}
)
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, str]:
@@ -168,55 +175,36 @@ class ComelitConfigFlow(ConfigFlow, domain=DOMAIN):
) -> ConfigFlowResult:
"""Handle reconfiguration of the device."""
reconfigure_entry = self._get_reconfigure_entry()
if not user_input:
return self.async_show_form(
step_id="reconfigure", data_schema=STEP_RECONFIGURE
)
updated_host = user_input[CONF_HOST]
self._async_abort_entries_match({CONF_HOST: updated_host})
errors: dict[str, str] = {}
if user_input is not None:
updated_host = user_input[CONF_HOST]
self._async_abort_entries_match({CONF_HOST: updated_host})
try:
data_to_validate = {
CONF_HOST: updated_host,
CONF_PORT: user_input[CONF_PORT],
CONF_PIN: user_input[CONF_PIN],
CONF_TYPE: reconfigure_entry.data.get(CONF_TYPE, BRIDGE),
}
await validate_input(self.hass, data_to_validate)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except InvalidPin:
errors["base"] = "invalid_pin"
except Exception: # noqa: BLE001
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
data_updates = {
CONF_HOST: updated_host,
CONF_PORT: user_input[CONF_PORT],
CONF_PIN: user_input[CONF_PIN],
}
return self.async_update_reload_and_abort(
reconfigure_entry, data_updates=data_updates
)
schema = vol.Schema(
{
vol.Required(
CONF_HOST, default=reconfigure_entry.data[CONF_HOST]
): cv.string,
vol.Required(
CONF_PORT, default=reconfigure_entry.data[CONF_PORT]
): cv.port,
vol.Optional(CONF_PIN): cv.string,
}
)
try:
await validate_input(self.hass, user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except InvalidPin:
errors["base"] = "invalid_pin"
except Exception: # noqa: BLE001
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return self.async_update_reload_and_abort(
reconfigure_entry, data_updates={CONF_HOST: updated_host}
)
return self.async_show_form(
step_id="reconfigure",
data_schema=schema,
data_schema=STEP_RECONFIGURE,
errors=errors,
)

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
import logging
from typing import Any, Literal
from typing import Literal
from hassil.recognize import RecognizeResult
import voluptuous as vol
@@ -21,7 +21,6 @@ from homeassistant.core import (
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, intent
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.reload import async_integration_yaml_config
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import bind_hass
@@ -53,8 +52,6 @@ from .const import (
DATA_COMPONENT,
DOMAIN,
HOME_ASSISTANT_AGENT,
METADATA_CUSTOM_FILE,
METADATA_CUSTOM_SENTENCE,
SERVICE_PROCESS,
SERVICE_RELOAD,
ConversationEntityFeature,
@@ -269,13 +266,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
entity_component = EntityComponent[ConversationEntity](_LOGGER, DOMAIN, hass)
hass.data[DATA_COMPONENT] = entity_component
manager = get_agent_manager(hass)
hass_config_path = hass.config.path()
config_intents = _get_config_intents(config, hass_config_path)
manager.update_config_intents(config_intents)
await async_setup_default_agent(hass, entity_component)
agent_config = config.get(DOMAIN, {})
await async_setup_default_agent(
hass, entity_component, config_intents=agent_config.get("intents", {})
)
async def handle_process(service: ServiceCall) -> ServiceResponse:
"""Parse text into commands."""
@@ -300,16 +294,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def handle_reload(service: ServiceCall) -> None:
"""Reload intents."""
language = service.data.get(ATTR_LANGUAGE)
if language is None:
conf = await async_integration_yaml_config(hass, DOMAIN)
if conf is not None:
config_intents = _get_config_intents(conf, hass_config_path)
manager.update_config_intents(config_intents)
agent = manager.default_agent
agent = get_agent_manager(hass).default_agent
if agent is not None:
await agent.async_reload(language=language)
await agent.async_reload(language=service.data.get(ATTR_LANGUAGE))
hass.services.async_register(
DOMAIN,
@@ -326,27 +313,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
def _get_config_intents(config: ConfigType, hass_config_path: str) -> dict[str, Any]:
"""Return config intents."""
intents = config.get(DOMAIN, {}).get("intents", {})
return {
"intents": {
intent_name: {
"data": [
{
"sentences": sentences,
"metadata": {
METADATA_CUSTOM_SENTENCE: True,
METADATA_CUSTOM_FILE: hass_config_path,
},
}
]
}
for intent_name, sentences in intents.items()
}
}
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a config entry."""
return await hass.data[DATA_COMPONENT].async_setup_entry(entry)

View File

@@ -147,7 +147,6 @@ class AgentManager:
self.hass = hass
self._agents: dict[str, AbstractConversationAgent] = {}
self.default_agent: DefaultAgent | None = None
self.config_intents: dict[str, Any] = {}
self.triggers_details: list[TriggerDetails] = []
@callback
@@ -200,16 +199,9 @@ class AgentManager:
async def async_setup_default_agent(self, agent: DefaultAgent) -> None:
"""Set up the default agent."""
agent.update_config_intents(self.config_intents)
agent.update_triggers(self.triggers_details)
self.default_agent = agent
def update_config_intents(self, intents: dict[str, Any]) -> None:
"""Update config intents."""
self.config_intents = intents
if self.default_agent is not None:
self.default_agent.update_config_intents(intents)
def register_trigger(self, trigger_details: TriggerDetails) -> CALLBACK_TYPE:
"""Register a trigger."""
self.triggers_details.append(trigger_details)

View File

@@ -30,7 +30,3 @@ class ConversationEntityFeature(IntFlag):
"""Supported features of the conversation entity."""
CONTROL = 1
METADATA_CUSTOM_SENTENCE = "hass_custom_sentence"
METADATA_CUSTOM_FILE = "hass_custom_file"

View File

@@ -77,12 +77,7 @@ from homeassistant.util.json import JsonObjectType, json_loads_object
from .agent_manager import get_agent_manager
from .chat_log import AssistantContent, ChatLog
from .const import (
DOMAIN,
METADATA_CUSTOM_FILE,
METADATA_CUSTOM_SENTENCE,
ConversationEntityFeature,
)
from .const import DOMAIN, ConversationEntityFeature
from .entity import ConversationEntity
from .models import ConversationInput, ConversationResult
from .trace import ConversationTraceEventType, async_conversation_trace_append
@@ -96,6 +91,8 @@ _ENTITY_REGISTRY_UPDATE_FIELDS = ["aliases", "name", "original_name"]
_DEFAULT_EXPOSED_ATTRIBUTES = {"device_class"}
METADATA_CUSTOM_SENTENCE = "hass_custom_sentence"
METADATA_CUSTOM_FILE = "hass_custom_file"
METADATA_FUZZY_MATCH = "hass_fuzzy_match"
ERROR_SENTINEL = object()
@@ -205,9 +202,10 @@ class IntentCache:
async def async_setup_default_agent(
hass: HomeAssistant,
entity_component: EntityComponent[ConversationEntity],
config_intents: dict[str, Any],
) -> None:
"""Set up entity registry listener for the default agent."""
agent = DefaultAgent(hass)
agent = DefaultAgent(hass, config_intents)
await entity_component.async_add_entities([agent])
await get_agent_manager(hass).async_setup_default_agent(agent)
@@ -232,14 +230,14 @@ class DefaultAgent(ConversationEntity):
_attr_name = "Home Assistant"
_attr_supported_features = ConversationEntityFeature.CONTROL
def __init__(self, hass: HomeAssistant) -> None:
def __init__(self, hass: HomeAssistant, config_intents: dict[str, Any]) -> None:
"""Initialize the default agent."""
self.hass = hass
self._lang_intents: dict[str, LanguageIntents | object] = {}
self._load_intents_lock = asyncio.Lock()
# Intents from common conversation config
self._config_intents: dict[str, Any] = {}
# intent -> [sentences]
self._config_intents: dict[str, Any] = config_intents
# Sentences that will trigger a callback (skipping intent recognition)
self._triggers_details: list[TriggerDetails] = []
@@ -1037,14 +1035,6 @@ class DefaultAgent(ConversationEntity):
# Intents have changed, so we must clear the cache
self._intent_cache.clear()
@callback
def update_config_intents(self, intents: dict[str, Any]) -> None:
"""Update config intents."""
self._config_intents = intents
# Intents have changed, so we must clear the cache
self._intent_cache.clear()
async def async_prepare(self, language: str | None = None) -> None:
"""Load intents for a language."""
if language is None:
@@ -1169,10 +1159,33 @@ class DefaultAgent(ConversationEntity):
custom_sentences_path,
)
merge_dict(
intents_dict,
self._config_intents,
)
# Load sentences from HA config for default language only
if self._config_intents and (
self.hass.config.language in (language, language_variant)
):
hass_config_path = self.hass.config.path()
merge_dict(
intents_dict,
{
"intents": {
intent_name: {
"data": [
{
"sentences": sentences,
"metadata": {
METADATA_CUSTOM_SENTENCE: True,
METADATA_CUSTOM_FILE: hass_config_path,
},
}
]
}
for intent_name, sentences in self._config_intents.items()
}
},
)
_LOGGER.debug(
"Loaded intents from configuration.yaml",
)
if not intents_dict:
return None

View File

@@ -136,75 +136,5 @@
"name": "Toggle tilt"
}
},
"title": "Cover",
"triggers": {
"opens": {
"description": "Triggers when a cover opens.",
"description_configured": "Triggers when a cover opens",
"fields": {
"fully_opened": {
"description": "Only trigger when the cover is fully opened (position at 100%).",
"name": "Fully opened"
},
"device_class": {
"description": "The device classes to trigger on. If empty, triggers on all device classes.",
"name": "Device classes"
}
},
"name": "When a cover opens"
},
"closes": {
"description": "Triggers when a cover closes.",
"description_configured": "Triggers when a cover closes",
"fields": {
"fully_closed": {
"description": "Only trigger when the cover is fully closed (position at 0%).",
"name": "Fully closed"
},
"device_class": {
"description": "The device classes to trigger on. If empty, triggers on all device classes.",
"name": "Device classes"
}
},
"name": "When a cover closes"
},
"stops": {
"description": "Triggers when a cover stops moving.",
"description_configured": "Triggers when a cover stops moving",
"fields": {
"device_class": {
"description": "The device classes to trigger on. If empty, triggers on all device classes.",
"name": "Device classes"
}
},
"name": "When a cover stops moving"
},
"position_changed": {
"description": "Triggers when the position of a cover changes.",
"description_configured": "Triggers when the position of a cover changes",
"fields": {
"lower": {
"description": "The minimum position value to trigger on. Only triggers when position is at or above this value.",
"name": "Lower limit"
},
"upper": {
"description": "The maximum position value to trigger on. Only triggers when position is at or below this value.",
"name": "Upper limit"
},
"above": {
"description": "Only trigger when position is above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when position is below this value.",
"name": "Below"
},
"device_class": {
"description": "The device classes to trigger on. If empty, triggers on all device classes.",
"name": "Device classes"
}
},
"name": "When the position of a cover changes"
}
}
"title": "Cover"
}

View File

@@ -1,453 +0,0 @@
"""Provides triggers for covers."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_DEVICE_CLASS,
CONF_OPTIONS,
CONF_TARGET,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from . import ATTR_CURRENT_POSITION, CoverDeviceClass, CoverState
from .const import DOMAIN
CONF_LOWER = "lower"
CONF_UPPER = "upper"
CONF_ABOVE = "above"
CONF_BELOW = "below"
CONF_FULLY_OPENED = "fully_opened"
CONF_FULLY_CLOSED = "fully_closed"
OPENS_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(CONF_FULLY_OPENED, default=False): cv.boolean,
vol.Optional(CONF_DEVICE_CLASS, default=[]): vol.All(
cv.ensure_list, [vol.Coerce(CoverDeviceClass)]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
CLOSES_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(CONF_FULLY_CLOSED, default=False): cv.boolean,
vol.Optional(CONF_DEVICE_CLASS, default=[]): vol.All(
cv.ensure_list, [vol.Coerce(CoverDeviceClass)]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
STOPS_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(CONF_DEVICE_CLASS, default=[]): vol.All(
cv.ensure_list, [vol.Coerce(CoverDeviceClass)]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
POSITION_CHANGED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Exclusive(CONF_LOWER, "position_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=100)
),
vol.Exclusive(CONF_UPPER, "position_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=100)
),
vol.Exclusive(CONF_ABOVE, "position_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=100)
),
vol.Exclusive(CONF_BELOW, "position_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=100)
),
vol.Optional(CONF_DEVICE_CLASS, default=[]): vol.All(
cv.ensure_list, [vol.Coerce(CoverDeviceClass)]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class CoverOpensTrigger(Trigger):
"""Trigger for when a cover opens."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, OPENS_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the cover opens trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
assert config.options is not None
self._target = config.target
self._options = config.options
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
fully_opened = self._options[CONF_FULLY_OPENED]
device_classes_filter = self._options[CONF_DEVICE_CLASS]
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Filter by device class if specified
if device_classes_filter:
device_class = to_state.attributes.get(CONF_DEVICE_CLASS)
if device_class not in device_classes_filter:
return
# Trigger when cover opens or is opening
if to_state.state in (CoverState.OPEN, CoverState.OPENING):
# If fully_opened is True, only trigger when position reaches 100
if fully_opened:
current_position = to_state.attributes.get(ATTR_CURRENT_POSITION)
if current_position != 100:
return
# Only trigger on state change, not if already in that state
if from_state and from_state.state == to_state.state:
# For fully_opened, allow triggering when position changes to 100
if fully_opened:
from_position = from_state.attributes.get(ATTR_CURRENT_POSITION)
to_position = to_state.attributes.get(ATTR_CURRENT_POSITION)
if from_position == to_position:
return
else:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"cover opened on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class CoverClosesTrigger(Trigger):
"""Trigger for when a cover closes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, CLOSES_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the cover closes trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
assert config.options is not None
self._target = config.target
self._options = config.options
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
fully_closed = self._options[CONF_FULLY_CLOSED]
device_classes_filter = self._options[CONF_DEVICE_CLASS]
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Filter by device class if specified
if device_classes_filter:
device_class = to_state.attributes.get(CONF_DEVICE_CLASS)
if device_class not in device_classes_filter:
return
# Trigger when cover closes or is closing
if to_state.state in (CoverState.CLOSED, CoverState.CLOSING):
# If fully_closed is True, only trigger when position reaches 0
if fully_closed:
current_position = to_state.attributes.get(ATTR_CURRENT_POSITION)
if current_position != 0:
return
# Only trigger on state change, not if already in that state
if from_state and from_state.state == to_state.state:
# For fully_closed, allow triggering when position changes to 0
if fully_closed:
from_position = from_state.attributes.get(ATTR_CURRENT_POSITION)
to_position = to_state.attributes.get(ATTR_CURRENT_POSITION)
if from_position == to_position:
return
else:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"cover closed on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class CoverStopsTrigger(Trigger):
"""Trigger for when a cover stops moving."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, STOPS_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the cover stops trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
assert config.options is not None
self._target = config.target
self._options = config.options
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
device_classes_filter = self._options[CONF_DEVICE_CLASS]
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Filter by device class if specified
if device_classes_filter:
device_class = to_state.attributes.get(CONF_DEVICE_CLASS)
if device_class not in device_classes_filter:
return
# Trigger when cover stops (from opening/closing to open/closed)
if from_state and from_state.state in (
CoverState.OPENING,
CoverState.CLOSING,
):
if to_state.state in (CoverState.OPEN, CoverState.CLOSED):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"cover stopped on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class CoverPositionChangedTrigger(Trigger):
"""Trigger for when a cover's position changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, POSITION_CHANGED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the cover position changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
self._options = config.options or {}
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
lower_limit = self._options.get(CONF_LOWER)
upper_limit = self._options.get(CONF_UPPER)
above_limit = self._options.get(CONF_ABOVE)
below_limit = self._options.get(CONF_BELOW)
device_classes_filter = self._options.get(CONF_DEVICE_CLASS, [])
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Filter by device class if specified
if device_classes_filter:
device_class = to_state.attributes.get(CONF_DEVICE_CLASS)
if device_class not in device_classes_filter:
return
# Get position values
from_position = (
from_state.attributes.get(ATTR_CURRENT_POSITION) if from_state else None
)
to_position = to_state.attributes.get(ATTR_CURRENT_POSITION)
# Only trigger if position value exists and has changed
if to_position is None or from_position == to_position:
return
# Apply threshold filters if configured
if lower_limit is not None and to_position < lower_limit:
return
if upper_limit is not None and to_position > upper_limit:
return
if above_limit is not None and to_position <= above_limit:
return
if below_limit is not None and to_position >= below_limit:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
"from_position": from_position,
"to_position": to_position,
},
f"position changed on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
TRIGGERS: dict[str, type[Trigger]] = {
"opens": CoverOpensTrigger,
"closes": CoverClosesTrigger,
"stops": CoverStopsTrigger,
"position_changed": CoverPositionChangedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for covers."""
return TRIGGERS

View File

@@ -1,101 +0,0 @@
opens:
target:
entity:
domain: cover
fields:
fully_opened:
required: false
default: false
selector:
boolean:
device_class:
required: false
default: []
selector:
select:
multiple: true
options:
- curtain
- shutter
- blind
closes:
target:
entity:
domain: cover
fields:
fully_closed:
required: false
default: false
selector:
boolean:
device_class:
required: false
default: []
selector:
select:
multiple: true
options:
- curtain
- shutter
- blind
stops:
target:
entity:
domain: cover
fields:
device_class:
required: false
default: []
selector:
select:
multiple: true
options:
- curtain
- shutter
- blind
position_changed:
target:
entity:
domain: cover
fields:
lower:
required: false
selector:
number:
min: 0
max: 100
mode: box
upper:
required: false
selector:
number:
min: 0
max: 100
mode: box
above:
required: false
selector:
number:
min: 0
max: 100
mode: box
below:
required: false
selector:
number:
min: 0
max: 100
mode: box
device_class:
required: false
default: []
selector:
select:
multiple: true
options:
- curtain
- shutter
- blind

View File

@@ -116,28 +116,20 @@ class FoscamCoordinator(DataUpdateCoordinator[FoscamDeviceInfo]):
is_open_wdr = None
is_open_hdr = None
reserve3 = product_info.get("reserve4")
model = product_info.get("model")
model_int = int(model) if model is not None else 7002
if model_int > 7001:
reserve3_int = int(reserve3) if reserve3 is not None else 0
supports_wdr_adjustment_val = bool(int(reserve3_int & 256))
supports_hdr_adjustment_val = bool(int(reserve3_int & 128))
if supports_wdr_adjustment_val:
ret_wdr, is_open_wdr_data = self.session.getWdrMode()
mode = (
is_open_wdr_data["mode"] if ret_wdr == 0 and is_open_wdr_data else 0
)
is_open_wdr = bool(int(mode))
elif supports_hdr_adjustment_val:
ret_hdr, is_open_hdr_data = self.session.getHdrMode()
mode = (
is_open_hdr_data["mode"] if ret_hdr == 0 and is_open_hdr_data else 0
)
is_open_hdr = bool(int(mode))
else:
supports_wdr_adjustment_val = False
supports_hdr_adjustment_val = False
reserve3_int = int(reserve3) if reserve3 is not None else 0
supports_wdr_adjustment_val = bool(int(reserve3_int & 256))
supports_hdr_adjustment_val = bool(int(reserve3_int & 128))
if supports_wdr_adjustment_val:
ret_wdr, is_open_wdr_data = self.session.getWdrMode()
mode = is_open_wdr_data["mode"] if ret_wdr == 0 and is_open_wdr_data else 0
is_open_wdr = bool(int(mode))
elif supports_hdr_adjustment_val:
ret_hdr, is_open_hdr_data = self.session.getHdrMode()
mode = is_open_hdr_data["mode"] if ret_hdr == 0 and is_open_hdr_data else 0
is_open_hdr = bool(int(mode))
ret_sw, software_capabilities = self.session.getSWCapabilities()
supports_speak_volume_adjustment_val = (
bool(int(software_capabilities.get("swCapabilities1")) & 32)
if ret_sw == 0

View File

@@ -770,7 +770,9 @@ class ManifestJSONView(HomeAssistantView):
@websocket_api.websocket_command(
{
"type": "frontend/get_icons",
vol.Required("category"): vol.In({"entity", "entity_component", "services"}),
vol.Required("category"): vol.In(
{"entity", "entity_component", "services", "triggers"}
),
vol.Optional("integration"): vol.All(cv.ensure_list, [str]),
}
)

View File

@@ -0,0 +1,77 @@
"""Support for the Hive alarm."""
from __future__ import annotations
from datetime import timedelta
from homeassistant.components.alarm_control_panel import (
AlarmControlPanelEntity,
AlarmControlPanelEntityFeature,
AlarmControlPanelState,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import HiveConfigEntry
from .entity import HiveEntity
PARALLEL_UPDATES = 0
SCAN_INTERVAL = timedelta(seconds=15)
HIVETOHA = {
"home": AlarmControlPanelState.DISARMED,
"asleep": AlarmControlPanelState.ARMED_NIGHT,
"away": AlarmControlPanelState.ARMED_AWAY,
"sos": AlarmControlPanelState.TRIGGERED,
}
async def async_setup_entry(
hass: HomeAssistant,
entry: HiveConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Hive thermostat based on a config entry."""
hive = entry.runtime_data
if devices := hive.session.deviceList.get("alarm_control_panel"):
async_add_entities(
[HiveAlarmControlPanelEntity(hive, dev) for dev in devices], True
)
class HiveAlarmControlPanelEntity(HiveEntity, AlarmControlPanelEntity):
"""Representation of a Hive alarm."""
_attr_supported_features = (
AlarmControlPanelEntityFeature.ARM_NIGHT
| AlarmControlPanelEntityFeature.ARM_AWAY
| AlarmControlPanelEntityFeature.TRIGGER
)
_attr_code_arm_required = False
async def async_alarm_disarm(self, code: str | None = None) -> None:
"""Send disarm command."""
await self.hive.alarm.setMode(self.device, "home")
async def async_alarm_arm_night(self, code: str | None = None) -> None:
"""Send arm night command."""
await self.hive.alarm.setMode(self.device, "asleep")
async def async_alarm_arm_away(self, code: str | None = None) -> None:
"""Send arm away command."""
await self.hive.alarm.setMode(self.device, "away")
async def async_alarm_trigger(self, code: str | None = None) -> None:
"""Send alarm trigger command."""
await self.hive.alarm.setMode(self.device, "sos")
async def async_update(self) -> None:
"""Update all Node data from Hive."""
await self.hive.session.updateData(self.device)
self.device = await self.hive.alarm.getAlarm(self.device)
self._attr_available = self.device["deviceData"].get("online")
if self._attr_available:
if self.device["status"]["state"]:
self._attr_alarm_state = AlarmControlPanelState.TRIGGERED
else:
self._attr_alarm_state = HIVETOHA[self.device["status"]["mode"]]

View File

@@ -11,6 +11,7 @@ CONFIG_ENTRY_VERSION = 1
DEFAULT_NAME = "Hive"
DOMAIN = "hive"
PLATFORMS = [
Platform.ALARM_CONTROL_PANEL,
Platform.BINARY_SENSOR,
Platform.CLIMATE,
Platform.LIGHT,
@@ -19,6 +20,7 @@ PLATFORMS = [
Platform.WATER_HEATER,
]
PLATFORM_LOOKUP = {
Platform.ALARM_CONTROL_PANEL: "alarm_control_panel",
Platform.BINARY_SENSOR: "binary_sensor",
Platform.CLIMATE: "climate",
Platform.LIGHT: "light",

View File

@@ -9,5 +9,5 @@
},
"iot_class": "cloud_polling",
"loggers": ["apyhiveapi"],
"requirements": ["pyhive-integration==1.0.7"]
"requirements": ["pyhive-integration==1.0.6"]
}

View File

@@ -1237,7 +1237,7 @@
"message": "Error obtaining data from the API: {error}"
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
"message": "OAuth2 implementation temporarily unavailable, will retry"
},
"pause_program": {
"message": "Error pausing program: {error}"

View File

@@ -4,7 +4,6 @@ import logging
from typing import TYPE_CHECKING
from aiopvapi.resources.model import PowerviewData
from aiopvapi.resources.shade_data import PowerviewShadeData
from aiopvapi.rooms import Rooms
from aiopvapi.scenes import Scenes
from aiopvapi.shades import Shades
@@ -17,6 +16,7 @@ from homeassistant.helpers import device_registry as dr, entity_registry as er
from .const import DOMAIN, HUB_EXCEPTIONS, MANUFACTURER
from .coordinator import PowerviewShadeUpdateCoordinator
from .model import PowerviewConfigEntry, PowerviewEntryData
from .shade_data import PowerviewShadeData
from .util import async_connect_hub
PARALLEL_UPDATES = 1

View File

@@ -8,7 +8,6 @@ import logging
from aiopvapi.helpers.aiorequest import PvApiMaintenance
from aiopvapi.hub import Hub
from aiopvapi.resources.shade_data import PowerviewShadeData
from aiopvapi.shades import Shades
from homeassistant.config_entries import ConfigEntry
@@ -16,6 +15,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import HUB_EXCEPTIONS
from .shade_data import PowerviewShadeData
_LOGGER = logging.getLogger(__name__)

View File

@@ -208,13 +208,13 @@ class PowerViewShadeBase(ShadeEntity, CoverEntity):
async def _async_execute_move(self, move: ShadePosition) -> None:
"""Execute a move that can affect multiple positions."""
_LOGGER.debug("Move request %s: %s", self.name, move)
# Store the requested positions so subsequent move
# requests contain the secondary shade positions
self.data.update_shade_position(self._shade.id, move)
async with self.coordinator.radio_operation_lock:
response = await self._shade.move(move)
_LOGGER.debug("Move response %s: %s", self.name, response)
# Process the response from the hub (including new positions)
self.data.update_shade_position(self._shade.id, response)
async def _async_set_cover_position(self, target_hass_position: int) -> None:
"""Move the shade to a position."""
target_hass_position = self._clamp_cover_limit(target_hass_position)

View File

@@ -3,7 +3,6 @@
import logging
from aiopvapi.resources.shade import BaseShade, ShadePosition
from aiopvapi.resources.shade_data import PowerviewShadeData
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo
@@ -12,6 +11,7 @@ from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, MANUFACTURER
from .coordinator import PowerviewShadeUpdateCoordinator
from .model import PowerviewDeviceInfo
from .shade_data import PowerviewShadeData
_LOGGER = logging.getLogger(__name__)

View File

@@ -18,6 +18,6 @@
},
"iot_class": "local_polling",
"loggers": ["aiopvapi"],
"requirements": ["aiopvapi==3.3.0"],
"requirements": ["aiopvapi==3.2.1"],
"zeroconf": ["_powerview._tcp.local.", "_PowerView-G3._tcp.local."]
}

View File

@@ -0,0 +1,80 @@
"""Shade data for the Hunter Douglas PowerView integration."""
from __future__ import annotations
from dataclasses import fields
from typing import Any
from aiopvapi.resources.model import PowerviewData
from aiopvapi.resources.shade import BaseShade, ShadePosition
from .util import async_map_data_by_id
POSITION_FIELDS = [field for field in fields(ShadePosition) if field.name != "velocity"]
def copy_position_data(source: ShadePosition, target: ShadePosition) -> ShadePosition:
"""Copy position data from source to target for None values only."""
for field in POSITION_FIELDS:
if (value := getattr(source, field.name)) is not None:
setattr(target, field.name, value)
class PowerviewShadeData:
"""Coordinate shade data between multiple api calls."""
def __init__(self) -> None:
"""Init the shade data."""
self._raw_data_by_id: dict[int, dict[str | int, Any]] = {}
self._shade_group_data_by_id: dict[int, BaseShade] = {}
self.positions: dict[int, ShadePosition] = {}
def get_raw_data(self, shade_id: int) -> dict[str | int, Any]:
"""Get data for the shade."""
return self._raw_data_by_id[shade_id]
def get_all_raw_data(self) -> dict[int, dict[str | int, Any]]:
"""Get data for all shades."""
return self._raw_data_by_id
def get_shade(self, shade_id: int) -> BaseShade:
"""Get specific shade from the coordinator."""
return self._shade_group_data_by_id[shade_id]
def get_shade_position(self, shade_id: int) -> ShadePosition:
"""Get positions for a shade."""
if shade_id not in self.positions:
shade_position = ShadePosition()
# If we have the group data, use it to populate the initial position
if shade := self._shade_group_data_by_id.get(shade_id):
copy_position_data(shade.current_position, shade_position)
self.positions[shade_id] = shade_position
return self.positions[shade_id]
def update_from_group_data(self, shade_id: int) -> None:
"""Process an update from the group data."""
data = self._shade_group_data_by_id[shade_id]
copy_position_data(data.current_position, self.get_shade_position(data.id))
def store_group_data(self, shade_data: PowerviewData) -> None:
"""Store data from the all shades endpoint.
This does not update the shades or positions (self.positions)
as the data may be stale. update_from_group_data
with a shade_id will update a specific shade
from the group data.
"""
self._shade_group_data_by_id = shade_data.processed
self._raw_data_by_id = async_map_data_by_id(shade_data.raw)
def update_shade_position(self, shade_id: int, new_position: ShadePosition) -> None:
"""Update a single shades position."""
copy_position_data(new_position, self.get_shade_position(shade_id))
def update_shade_velocity(self, shade_id: int, shade_data: ShadePosition) -> None:
"""Update a single shades velocity."""
# the hub will always return a velocity of 0 on initial connect,
# separate definition to store consistent value in HA
# this value is purely driven from HA
if shade_data.velocity is not None:
self.get_shade_position(shade_id).velocity = shade_data.velocity

View File

@@ -2,15 +2,25 @@
from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from aiopvapi.helpers.aiorequest import AioRequest
from aiopvapi.helpers.constants import ATTR_ID
from aiopvapi.hub import Hub
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .model import PowerviewAPI, PowerviewDeviceInfo
@callback
def async_map_data_by_id(data: Iterable[dict[str | int, Any]]):
"""Return a dict with the key being the id for a list of entries."""
return {entry[ATTR_ID]: entry for entry in data}
async def async_connect_hub(
hass: HomeAssistant, address: str, api_version: int | None = None
) -> PowerviewAPI:

View File

@@ -13,7 +13,6 @@ from typing import Any
from aiohttp import web
from hyperion import client
from hyperion.const import (
KEY_DATA,
KEY_IMAGE,
KEY_IMAGE_STREAM,
KEY_LEDCOLORS,
@@ -156,8 +155,7 @@ class HyperionCamera(Camera):
"""Update Hyperion components."""
if not img:
return
# Prefer KEY_DATA (Hyperion server >= 2.1.1); fall back to KEY_RESULT for older server versions
img_data = img.get(KEY_DATA, img.get(KEY_RESULT, {})).get(KEY_IMAGE)
img_data = img.get(KEY_RESULT, {}).get(KEY_IMAGE)
if not img_data or not img_data.startswith(IMAGE_STREAM_JPG_SENTINEL):
return
async with self._image_cond:

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from propcache.api import cached_property
from pyituran import Vehicle
from homeassistant.components.binary_sensor import (
@@ -68,7 +69,7 @@ class IturanBinarySensor(IturanBaseEntity, BinarySensorEntity):
super().__init__(coordinator, license_plate, description.key)
self.entity_description = description
@property
@cached_property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self.entity_description.value_fn(self.vehicle)

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
from propcache.api import cached_property
from homeassistant.components.device_tracker import TrackerEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
@@ -38,12 +40,12 @@ class IturanDeviceTracker(IturanBaseEntity, TrackerEntity):
"""Initialize the device tracker."""
super().__init__(coordinator, license_plate, "device_tracker")
@property
@cached_property
def latitude(self) -> float | None:
"""Return latitude value of the device."""
return self.vehicle.gps_coordinates[0]
@property
@cached_property
def longitude(self) -> float | None:
"""Return longitude value of the device."""
return self.vehicle.gps_coordinates[1]

View File

@@ -6,6 +6,7 @@ from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from propcache.api import cached_property
from pyituran import Vehicle
from homeassistant.components.sensor import (
@@ -132,7 +133,7 @@ class IturanSensor(IturanBaseEntity, SensorEntity):
super().__init__(coordinator, license_plate, description.key)
self.entity_description = description
@property
@cached_property
def native_value(self) -> StateType | datetime:
"""Return the state of the device."""
return self.entity_description.value_fn(self.vehicle)

View File

@@ -94,6 +94,28 @@
}
},
"services": {
"address_to_device_id": {
"description": "Converts an LCN address into a device ID.",
"fields": {
"host": {
"description": "Host name as given in the integration panel.",
"name": "Host name"
},
"id": {
"description": "Module or group number of the target.",
"name": "Module or group ID"
},
"segment_id": {
"description": "Segment number of the target.",
"name": "Segment ID"
},
"type": {
"description": "Module type of the target.",
"name": "Type"
}
},
"name": "Address to device ID"
},
"dyn_text": {
"description": "Sends dynamic text to LCN-GTxD displays.",
"fields": {

View File

@@ -0,0 +1,131 @@
"""Provides conditions for lights."""
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Final, override
import voluptuous as vol
from homeassistant.const import CONF_OPTIONS, CONF_TARGET, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import config_validation as cv, target
from homeassistant.helpers.condition import (
Condition,
ConditionCheckerType,
ConditionConfig,
trace_condition_function,
)
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
from .const import DOMAIN
ATTR_BEHAVIOR: Final = "behavior"
BEHAVIOR_ANY: Final = "any"
BEHAVIOR_ALL: Final = "all"
STATE_CONDITION_VALID_STATES: Final = [STATE_ON, STATE_OFF]
STATE_CONDITION_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_ANY, BEHAVIOR_ALL]
),
}
STATE_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
vol.Required(CONF_OPTIONS): STATE_CONDITION_OPTIONS_SCHEMA,
}
)
class StateConditionBase(Condition):
"""State condition."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return STATE_CONDITION_SCHEMA(config) # type: ignore[no-any-return]
def __init__(
self, hass: HomeAssistant, config: ConditionConfig, state: str
) -> None:
"""Initialize condition."""
self._hass = hass
if TYPE_CHECKING:
assert config.target
assert config.options
self._target = config.target
self._behavior = config.options[ATTR_BEHAVIOR]
self._state = state
@override
async def async_get_checker(self) -> ConditionCheckerType:
"""Get the condition checker."""
def check_any_match_state(states: list[str]) -> bool:
"""Test if any entity match the state."""
return any(state == self._state for state in states)
def check_all_match_state(states: list[str]) -> bool:
"""Test if all entities match the state."""
return all(state == self._state for state in states)
matcher: Callable[[list[str]], bool]
if self._behavior == BEHAVIOR_ANY:
matcher = check_any_match_state
elif self._behavior == BEHAVIOR_ALL:
matcher = check_all_match_state
@trace_condition_function
def test_state(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Test state condition."""
selector_data = target.TargetSelectorData(self._target)
targeted_entities = target.async_extract_referenced_entity_ids(
hass, selector_data, expand_group=False
)
referenced_entity_ids = targeted_entities.referenced.union(
targeted_entities.indirectly_referenced
)
light_entity_ids = {
entity_id
for entity_id in referenced_entity_ids
if split_entity_id(entity_id)[0] == DOMAIN
}
light_entity_states = [
state.state
for entity_id in light_entity_ids
if (state := hass.states.get(entity_id))
and state.state in STATE_CONDITION_VALID_STATES
]
return matcher(light_entity_states)
return test_state
class IsOnCondition(StateConditionBase):
"""Is on condition."""
def __init__(self, hass: HomeAssistant, config: ConditionConfig) -> None:
"""Initialize condition."""
super().__init__(hass, config, STATE_ON)
class IsOffCondition(StateConditionBase):
"""Is off condition."""
def __init__(self, hass: HomeAssistant, config: ConditionConfig) -> None:
"""Initialize condition."""
super().__init__(hass, config, STATE_OFF)
CONDITIONS: dict[str, type[Condition]] = {
"is_off": IsOffCondition,
"is_on": IsOnCondition,
}
async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]:
"""Return the light conditions."""
return CONDITIONS

View File

@@ -0,0 +1,28 @@
is_off:
target:
entity:
domain: light
fields:
behavior:
required: true
default: any
selector:
select:
translation_key: condition_behavior
options:
- all
- any
is_on:
target:
entity:
domain: light
fields:
behavior:
required: true
default: any
selector:
select:
translation_key: condition_behavior
options:
- all
- any

View File

@@ -1,4 +1,12 @@
{
"conditions": {
"is_off": {
"condition": "mdi:lightbulb-off"
},
"is_on": {
"condition": "mdi:lightbulb-on"
}
},
"entity_component": {
"_": {
"default": "mdi:lightbulb",
@@ -25,5 +33,13 @@
"turn_on": {
"service": "mdi:lightbulb-on"
}
},
"triggers": {
"turned_off": {
"trigger": "mdi:lightbulb-off"
},
"turned_on": {
"trigger": "mdi:lightbulb-on"
}
}
}

View File

@@ -36,6 +36,30 @@
"field_xy_color_name": "XY-color",
"section_advanced_fields_name": "Advanced options"
},
"conditions": {
"is_off": {
"description": "Test if a light is off.",
"description_configured": "Test if a light is off",
"fields": {
"behavior": {
"description": "How the state should match on the targeted lights.",
"name": "Behavior"
}
},
"name": "If a light is off"
},
"is_on": {
"description": "Test if a light is on.",
"description_configured": "Test if a light is on",
"fields": {
"behavior": {
"description": "How the state should match on the targeted lights.",
"name": "Behavior"
}
},
"name": "If a light is on"
}
},
"device_automation": {
"action_type": {
"brightness_decrease": "Decrease {entity_name} brightness",
@@ -284,11 +308,30 @@
"yellowgreen": "Yellow green"
}
},
"condition_behavior": {
"options": {
"all": "All",
"any": "Any"
}
},
"flash": {
"options": {
"long": "Long",
"short": "Short"
}
},
"state": {
"options": {
"off": "[%key:common::state::off%]",
"on": "[%key:common::state::on%]"
}
},
"trigger_behavior": {
"options": {
"any": "Any",
"first": "First",
"last": "Last"
}
}
},
"services": {
@@ -464,38 +507,27 @@
},
"title": "Light",
"triggers": {
"turns_on": {
"description": "Triggers when a light turns on.",
"description_configured": "Triggers when a light turns on",
"name": "When a light turns on"
},
"turns_off": {
"description": "Triggers when a light turns off.",
"description_configured": "Triggers when a light turns off",
"name": "When a light turns off"
},
"brightness_changed": {
"description": "Triggers when the brightness of a light changes.",
"description_configured": "Triggers when the brightness of a light changes",
"turned_off": {
"description": "Triggers when a light is turned off.",
"description_configured": "Triggers when a light is turned off",
"fields": {
"lower": {
"description": "The minimum brightness value to trigger on. Only triggers when brightness is at or above this value.",
"name": "Lower limit"
},
"upper": {
"description": "The maximum brightness value to trigger on. Only triggers when brightness is at or below this value.",
"name": "Upper limit"
},
"above": {
"description": "Only trigger when brightness is above this value.",
"name": "Above"
},
"below": {
"description": "Only trigger when brightness is below this value.",
"name": "Below"
"behavior": {
"description": "The behavior of the targeted lights to trigger on.",
"name": "Behavior"
}
},
"name": "When the brightness of a light changes"
"name": "When a light is turned off"
},
"turned_on": {
"description": "Triggers when a light is turned on.",
"description_configured": "Triggers when a light is turned on",
"fields": {
"behavior": {
"description": "[%key:component::light::triggers::turned_off::fields::behavior::description%]",
"name": "[%key:component::light::triggers::turned_off::fields::behavior::name%]"
}
},
"name": "When a light is turned on"
}
}
}

View File

@@ -1,285 +1,14 @@
"""Provides triggers for lights."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_OPTIONS,
CONF_TARGET,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.trigger import Trigger, make_entity_state_trigger
from .const import DOMAIN
ATTR_BRIGHTNESS = "brightness"
CONF_LOWER = "lower"
CONF_UPPER = "upper"
CONF_ABOVE = "above"
CONF_BELOW = "below"
TURNS_ON_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
TURNS_OFF_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
BRIGHTNESS_CHANGED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Exclusive(CONF_LOWER, "brightness_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=255)
),
vol.Exclusive(CONF_UPPER, "brightness_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=255)
),
vol.Exclusive(CONF_ABOVE, "brightness_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=255)
),
vol.Exclusive(CONF_BELOW, "brightness_range"): vol.All(
vol.Coerce(int), vol.Range(min=0, max=255)
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class LightTurnsOnTrigger(Trigger):
"""Trigger for when a light turns on."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, TURNS_ON_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the light turns on trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when light turns on (from off to on)
if from_state and from_state.state == STATE_OFF and to_state.state == STATE_ON:
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"light turned on on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class LightTurnsOffTrigger(Trigger):
"""Trigger for when a light turns off."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, TURNS_OFF_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the light turns off trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when light turns off (from on to off)
if from_state and from_state.state == STATE_ON and to_state.state == STATE_OFF:
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"light turned off on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class LightBrightnessChangedTrigger(Trigger):
"""Trigger for when a light's brightness changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, BRIGHTNESS_CHANGED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the light brightness changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
self._options = config.options or {}
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
lower_limit = self._options.get(CONF_LOWER)
upper_limit = self._options.get(CONF_UPPER)
above_limit = self._options.get(CONF_ABOVE)
below_limit = self._options.get(CONF_BELOW)
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Get brightness values
from_brightness = (
from_state.attributes.get(ATTR_BRIGHTNESS) if from_state else None
)
to_brightness = to_state.attributes.get(ATTR_BRIGHTNESS)
# Only trigger if brightness value exists and has changed
if to_brightness is None or from_brightness == to_brightness:
return
# Apply threshold filters if configured
if lower_limit is not None and to_brightness < lower_limit:
return
if upper_limit is not None and to_brightness > upper_limit:
return
if above_limit is not None and to_brightness <= above_limit:
return
if below_limit is not None and to_brightness >= below_limit:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
"from_brightness": from_brightness,
"to_brightness": to_brightness,
},
f"brightness changed on {entity_id}",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
TRIGGERS: dict[str, type[Trigger]] = {
"turns_on": LightTurnsOnTrigger,
"turns_off": LightTurnsOffTrigger,
"brightness_changed": LightBrightnessChangedTrigger,
"turned_off": make_entity_state_trigger(DOMAIN, STATE_OFF),
"turned_on": make_entity_state_trigger(DOMAIN, STATE_ON),
}

View File

@@ -1,43 +1,31 @@
turns_on:
target:
entity:
domain: light
turns_off:
target:
entity:
domain: light
brightness_changed:
turned_on:
target:
entity:
domain: light
fields:
lower:
required: false
behavior:
required: true
default: any
selector:
number:
min: 0
max: 255
mode: box
upper:
required: false
select:
options:
- first
- last
- any
translation_key: trigger_behavior
turned_off:
target:
entity:
domain: light
fields:
behavior:
required: true
default: any
selector:
number:
min: 0
max: 255
mode: box
above:
required: false
selector:
number:
min: 0
max: 255
mode: box
below:
required: false
selector:
number:
min: 0
max: 255
mode: box
select:
translation_key: trigger_behavior
options:
- first
- last
- any

View File

@@ -353,13 +353,17 @@ DISCOVERY_SCHEMAS = [
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
# DeviceFault or SupplyFault bit enabled
device_to_ha=lambda x: bool(
x
& (
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kDeviceFault
| clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kSupplyFault
)
),
device_to_ha={
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kDeviceFault: True,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kSupplyFault: True,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kSpeedLow: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kSpeedHigh: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kLocalOverride: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRunning: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRemotePressure: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRemoteFlow: False,
clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRemoteTemperature: False,
}.get,
),
entity_class=MatterBinarySensor,
required_attributes=(
@@ -373,9 +377,9 @@ DISCOVERY_SCHEMAS = [
key="PumpStatusRunning",
translation_key="pump_running",
device_class=BinarySensorDeviceClass.RUNNING,
device_to_ha=lambda x: bool(
device_to_ha=lambda x: (
x
& clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRunning
== clusters.PumpConfigurationAndControl.Bitmaps.PumpStatusBitmap.kRunning
),
),
entity_class=MatterBinarySensor,
@@ -391,8 +395,8 @@ DISCOVERY_SCHEMAS = [
translation_key="dishwasher_alarm_inflow",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
device_to_ha=lambda x: bool(
x & clusters.DishwasherAlarm.Bitmaps.AlarmBitmap.kInflowError
device_to_ha=lambda x: (
x == clusters.DishwasherAlarm.Bitmaps.AlarmBitmap.kInflowError
),
),
entity_class=MatterBinarySensor,
@@ -406,8 +410,8 @@ DISCOVERY_SCHEMAS = [
translation_key="alarm_door",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
device_to_ha=lambda x: bool(
x & clusters.DishwasherAlarm.Bitmaps.AlarmBitmap.kDoorError
device_to_ha=lambda x: (
x == clusters.DishwasherAlarm.Bitmaps.AlarmBitmap.kDoorError
),
),
entity_class=MatterBinarySensor,
@@ -477,8 +481,8 @@ DISCOVERY_SCHEMAS = [
translation_key="alarm_door",
device_class=BinarySensorDeviceClass.PROBLEM,
entity_category=EntityCategory.DIAGNOSTIC,
device_to_ha=lambda x: bool(
x & clusters.RefrigeratorAlarm.Bitmaps.AlarmBitmap.kDoorOpen
device_to_ha=lambda x: (
x == clusters.RefrigeratorAlarm.Bitmaps.AlarmBitmap.kDoorOpen
),
),
entity_class=MatterBinarySensor,

View File

@@ -7,5 +7,5 @@
"integration_type": "service",
"iot_class": "local_polling",
"quality_scale": "platinum",
"requirements": ["aiomealie==1.1.0"]
"requirements": ["aiomealie==1.0.1"]
}

View File

@@ -367,63 +367,5 @@
"name": "Turn up volume"
}
},
"title": "Media player",
"triggers": {
"turns_on": {
"description": "Triggers when a media player turns on.",
"description_configured": "Triggers when a media player turns on",
"name": "When a media player turns on"
},
"turns_off": {
"description": "Triggers when a media player turns off.",
"description_configured": "Triggers when a media player turns off",
"name": "When a media player turns off"
},
"playing": {
"description": "Triggers when a media player starts playing.",
"description_configured": "Triggers when a media player starts playing",
"fields": {
"media_content_type": {
"description": "The media content types to trigger on. If empty, triggers on all content types.",
"name": "Media content types"
}
},
"name": "When a media player starts playing"
},
"paused": {
"description": "Triggers when a media player pauses.",
"description_configured": "Triggers when a media player pauses",
"name": "When a media player pauses"
},
"stopped": {
"description": "Triggers when a media player stops playing.",
"description_configured": "Triggers when a media player stops playing",
"name": "When a media player stops playing"
},
"muted": {
"description": "Triggers when a media player gets muted.",
"description_configured": "Triggers when a media player gets muted",
"name": "When a media player gets muted"
},
"unmuted": {
"description": "Triggers when a media player gets unmuted.",
"description_configured": "Triggers when a media player gets unmuted",
"name": "When a media player gets unmuted"
},
"volume_changed": {
"description": "Triggers when a media player volume changes.",
"description_configured": "Triggers when a media player volume changes",
"fields": {
"above": {
"description": "Only trigger when volume is above this level (0.0-1.0).",
"name": "Above"
},
"below": {
"description": "Only trigger when volume is below this level (0.0-1.0).",
"name": "Below"
}
},
"name": "When a media player volume changes"
}
}
"title": "Media player"
}

View File

@@ -1,676 +0,0 @@
"""Provides triggers for media players."""
from typing import TYPE_CHECKING, cast, override
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_OPTIONS,
CONF_TARGET,
STATE_IDLE,
STATE_OFF,
STATE_PAUSED,
STATE_PLAYING,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.target import (
TargetStateChangedData,
async_track_target_selector_state_change_event,
)
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from .const import (
ATTR_MEDIA_CONTENT_TYPE,
ATTR_MEDIA_VOLUME_LEVEL,
ATTR_MEDIA_VOLUME_MUTED,
DOMAIN,
)
TURNS_ON_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
TURNS_OFF_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
PLAYING_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional(ATTR_MEDIA_CONTENT_TYPE, default=[]): vol.All(
cv.ensure_list, [cv.string]
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
STOPPED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
MUTED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
UNMUTED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
VOLUME_CHANGED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {
vol.Optional("above"): vol.All(vol.Coerce(float), vol.Range(min=0.0, max=1.0)),
vol.Optional("below"): vol.All(vol.Coerce(float), vol.Range(min=0.0, max=1.0)),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
PAUSED_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_OPTIONS, default={}): {},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
class MediaPlayerTurnsOnTrigger(Trigger):
"""Trigger for when a media player turns on."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, TURNS_ON_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player turns on trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when turning on from off state
if (
from_state is not None
and from_state.state == STATE_OFF
and to_state.state != STATE_OFF
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} turned on",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerTurnsOffTrigger(Trigger):
"""Trigger for when a media player turns off."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, TURNS_OFF_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player turns off trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when turning off
if (
from_state is not None
and from_state.state != STATE_OFF
and to_state.state == STATE_OFF
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} turned off",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerPlayingTrigger(Trigger):
"""Trigger for when a media player starts playing."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, PLAYING_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player playing trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
media_content_types_filter = self._options[ATTR_MEDIA_CONTENT_TYPE]
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when starting to play
if (
from_state is not None
and from_state.state != STATE_PLAYING
and to_state.state == STATE_PLAYING
):
# If media_content_type filter is specified, check if it matches
if media_content_types_filter:
media_content_type = to_state.attributes.get(ATTR_MEDIA_CONTENT_TYPE)
if media_content_type not in media_content_types_filter:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} started playing",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerPausedTrigger(Trigger):
"""Trigger for when a media player pauses."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, PAUSED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player paused trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when pausing
if (
from_state is not None
and from_state.state != STATE_PAUSED
and to_state.state == STATE_PAUSED
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} paused",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerStoppedTrigger(Trigger):
"""Trigger for when a media player stops playing."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, STOPPED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player stopped trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when stopping (to idle or off from playing/paused states)
if (
from_state is not None
and from_state.state in (STATE_PLAYING, STATE_PAUSED)
and to_state.state in (STATE_IDLE, STATE_OFF)
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} stopped",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerMutedTrigger(Trigger):
"""Trigger for when a media player gets muted."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, MUTED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player muted trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when muting
if (
from_state is not None
and not from_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED, False)
and to_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED, False)
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} muted",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerUnmutedTrigger(Trigger):
"""Trigger for when a media player gets unmuted."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, UNMUTED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player unmuted trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Trigger when unmuting
if (
from_state is not None
and from_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED, False)
and not to_state.attributes.get(ATTR_MEDIA_VOLUME_MUTED, False)
):
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} unmuted",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
class MediaPlayerVolumeChangedTrigger(Trigger):
"""Trigger for when a media player volume changes."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, VOLUME_CHANGED_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the media player volume changed trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
assert config.target is not None
self._options = config.options
self._target = config.target
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
above_threshold = self._options.get("above")
below_threshold = self._options.get("below")
@callback
def state_change_listener(
target_state_change_data: TargetStateChangedData,
) -> None:
"""Listen for state changes and call action."""
event = target_state_change_data.state_change_event
entity_id = event.data["entity_id"]
from_state = event.data["old_state"]
to_state = event.data["new_state"]
# Ignore unavailable states
if to_state is None or to_state.state == STATE_UNAVAILABLE:
return
# Get volume levels
old_volume = (
from_state.attributes.get(ATTR_MEDIA_VOLUME_LEVEL)
if from_state is not None
else None
)
new_volume = to_state.attributes.get(ATTR_MEDIA_VOLUME_LEVEL)
# Volume must have changed
if old_volume == new_volume or new_volume is None:
return
# Check thresholds if specified
if above_threshold is not None and new_volume <= above_threshold:
return
if below_threshold is not None and new_volume >= below_threshold:
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"media player {entity_id} volume changed",
event.context,
)
def entity_filter(entities: set[str]) -> set[str]:
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, entity_filter
)
TRIGGERS: dict[str, type[Trigger]] = {
"turns_on": MediaPlayerTurnsOnTrigger,
"turns_off": MediaPlayerTurnsOffTrigger,
"playing": MediaPlayerPlayingTrigger,
"paused": MediaPlayerPausedTrigger,
"stopped": MediaPlayerStoppedTrigger,
"muted": MediaPlayerMutedTrigger,
"unmuted": MediaPlayerUnmutedTrigger,
"volume_changed": MediaPlayerVolumeChangedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for media players."""
return TRIGGERS

View File

@@ -1,65 +0,0 @@
turns_on:
target:
entity:
domain: media_player
turns_off:
target:
entity:
domain: media_player
playing:
target:
entity:
domain: media_player
fields:
media_content_type:
required: false
default: []
selector:
select:
multiple: true
custom_value: true
options: []
paused:
target:
entity:
domain: media_player
stopped:
target:
entity:
domain: media_player
muted:
target:
entity:
domain: media_player
unmuted:
target:
entity:
domain: media_player
volume_changed:
target:
entity:
domain: media_player
fields:
above:
required: false
selector:
number:
min: 0.0
max: 1.0
step: 0.01
mode: slider
below:
required: false
selector:
number:
min: 0.0
max: 1.0
step: 0.01
mode: slider

View File

@@ -1009,7 +1009,7 @@
"cleaning_care_program": "Cleaning/care program",
"maintenance_program": "Maintenance program",
"normal_operation_mode": "Normal operation mode",
"own_program": "Program"
"own_program": "Own program"
}
},
"remaining_time": {
@@ -1089,7 +1089,7 @@
"message": "Invalid device targeted."
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
"message": "OAuth2 implementation unavailable, will retry"
},
"set_program_error": {
"message": "'Set program' action failed: {status} / {message}"

View File

@@ -13,7 +13,7 @@ from music_assistant_client.exceptions import (
from music_assistant_models.api import ServerInfoMessage
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.config_entries import SOURCE_IGNORE, ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers import aiohttp_client
@@ -21,14 +21,21 @@ from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .const import DOMAIN, LOGGER
DEFAULT_TITLE = "Music Assistant"
DEFAULT_URL = "http://mass.local:8095"
DEFAULT_TITLE = "Music Assistant"
STEP_USER_SCHEMA = vol.Schema({vol.Required(CONF_URL): str})
def get_manual_schema(user_input: dict[str, Any]) -> vol.Schema:
"""Return a schema for the manual step."""
default_url = user_input.get(CONF_URL, DEFAULT_URL)
return vol.Schema(
{
vol.Required(CONF_URL, default=default_url): str,
}
)
async def _get_server_info(hass: HomeAssistant, url: str) -> ServerInfoMessage:
async def get_server_info(hass: HomeAssistant, url: str) -> ServerInfoMessage:
"""Validate the user input allows us to connect."""
async with MusicAssistantClient(
url, aiohttp_client.async_get_clientsession(hass)
@@ -45,17 +52,25 @@ class MusicAssistantConfigFlow(ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Set up flow instance."""
self.url: str | None = None
self.server_info: ServerInfoMessage | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a manual configuration."""
errors: dict[str, str] = {}
if user_input is not None:
try:
server_info = await _get_server_info(self.hass, user_input[CONF_URL])
self.server_info = await get_server_info(
self.hass, user_input[CONF_URL]
)
await self.async_set_unique_id(
self.server_info.server_id, raise_on_progress=False
)
self._abort_if_unique_id_configured(
updates={CONF_URL: user_input[CONF_URL]},
reload_on_update=True,
)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidServerVersion:
@@ -64,49 +79,68 @@ class MusicAssistantConfigFlow(ConfigFlow, domain=DOMAIN):
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(
server_info.server_id, raise_on_progress=False
)
self._abort_if_unique_id_configured(
updates={CONF_URL: user_input[CONF_URL]}
)
return self.async_create_entry(
title=DEFAULT_TITLE,
data={CONF_URL: user_input[CONF_URL]},
data={
CONF_URL: user_input[CONF_URL],
},
)
suggested_values = user_input
if suggested_values is None:
suggested_values = {CONF_URL: DEFAULT_URL}
return self.async_show_form(
step_id="user", data_schema=get_manual_schema(user_input), errors=errors
)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_SCHEMA, suggested_values
),
errors=errors,
)
return self.async_show_form(step_id="user", data_schema=get_manual_schema({}))
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle a zeroconf discovery for a Music Assistant server."""
"""Handle a discovered Mass server.
This flow is triggered by the Zeroconf component. It will check if the
host is already configured and delegate to the import step if not.
"""
# abort if discovery info is not what we expect
if "server_id" not in discovery_info.properties:
return self.async_abort(reason="missing_server_id")
self.server_info = ServerInfoMessage.from_dict(discovery_info.properties)
await self.async_set_unique_id(self.server_info.server_id)
# Check if we already have a config entry for this server_id
existing_entry = self.hass.config_entries.async_entry_for_domain_unique_id(
DOMAIN, self.server_info.server_id
)
if existing_entry:
# If the entry was ignored or disabled, don't make any changes
if existing_entry.source == SOURCE_IGNORE or existing_entry.disabled_by:
return self.async_abort(reason="already_configured")
# Test connectivity to the current URL first
current_url = existing_entry.data[CONF_URL]
try:
await get_server_info(self.hass, current_url)
# Current URL is working, no need to update
return self.async_abort(reason="already_configured")
except CannotConnect:
# Current URL is not working, update to the discovered URL
# and continue to discovery confirm
self.hass.config_entries.async_update_entry(
existing_entry,
data={**existing_entry.data, CONF_URL: self.server_info.base_url},
)
# Schedule reload since URL changed
self.hass.config_entries.async_schedule_reload(existing_entry.entry_id)
else:
# No existing entry, proceed with normal flow
self._abort_if_unique_id_configured()
# Test connectivity to the discovered URL
try:
server_info = ServerInfoMessage.from_dict(discovery_info.properties)
except LookupError:
return self.async_abort(reason="invalid_discovery_info")
self.url = server_info.base_url
await self.async_set_unique_id(server_info.server_id)
self._abort_if_unique_id_configured(updates={CONF_URL: self.url})
try:
await _get_server_info(self.hass, self.url)
await get_server_info(self.hass, self.server_info.base_url)
except CannotConnect:
return self.async_abort(reason="cannot_connect")
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
@@ -114,16 +148,16 @@ class MusicAssistantConfigFlow(ConfigFlow, domain=DOMAIN):
) -> ConfigFlowResult:
"""Handle user-confirmation of discovered server."""
if TYPE_CHECKING:
assert self.url is not None
assert self.server_info is not None
if user_input is not None:
return self.async_create_entry(
title=DEFAULT_TITLE,
data={CONF_URL: self.url},
data={
CONF_URL: self.server_info.base_url,
},
)
self._set_confirm_only()
return self.async_show_form(
step_id="discovery_confirm",
description_placeholders={"url": self.url},
description_placeholders={"url": self.server_info.base_url},
)

View File

@@ -2,7 +2,7 @@
"domain": "music_assistant",
"name": "Music Assistant",
"after_dependencies": ["media_source", "media_player"],
"codeowners": ["@music-assistant", "@arturpragacz"],
"codeowners": ["@music-assistant"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/music_assistant",
"iot_class": "local_push",

View File

@@ -57,7 +57,7 @@
"message": "Error while loading the integration."
},
"implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
"message": "OAuth2 implementation is not available, will retry."
},
"incorrect_oauth2_scope": {
"message": "Stored permissions are invalid. Please login again to update permissions."

View File

@@ -20,11 +20,10 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_WEBHOOK_ID, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import aiohttp_client, config_validation as cv
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
from homeassistant.helpers import (
aiohttp_client,
config_entry_oauth2_flow,
config_validation as cv,
)
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.dispatcher import async_dispatcher_send
@@ -74,19 +73,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Netatmo from a config entry."""
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
implementation = (
await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
)
# Set unique id if non was set (migration)
if not entry.unique_id:
hass.config_entries.async_update_entry(entry, unique_id=DOMAIN)
session = OAuth2Session(hass, entry, implementation)
session = config_entry_oauth2_flow.OAuth2Session(hass, entry, implementation)
try:
await session.async_ensure_token_valid()
except aiohttp.ClientResponseError as ex:

View File

@@ -143,11 +143,6 @@
}
}
},
"exceptions": {
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
}
},
"options": {
"step": {
"public_weather": {

View File

@@ -12,12 +12,7 @@ from homeassistant.helpers import entity_registry as er
from .const import _LOGGER
PLATFORMS: list[Platform] = [
Platform.CLIMATE,
Platform.COVER,
Platform.LIGHT,
Platform.SCENE,
]
PLATFORMS: list[Platform] = [Platform.COVER, Platform.LIGHT, Platform.SCENE]
type NikoHomeControlConfigEntry = ConfigEntry[NHCController]

View File

@@ -1,100 +0,0 @@
"""Support for Niko Home Control thermostats."""
from typing import Any
from nhc.const import THERMOSTAT_MODES, THERMOSTAT_MODES_REVERSE
from nhc.thermostat import NHCThermostat
from homeassistant.components.climate import (
PRESET_ECO,
ClimateEntity,
ClimateEntityFeature,
HVACMode,
)
from homeassistant.components.sensor import UnitOfTemperature
from homeassistant.const import ATTR_TEMPERATURE
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import NikoHomeControlConfigEntry
from .const import (
NIKO_HOME_CONTROL_THERMOSTAT_MODES_MAP,
NikoHomeControlThermostatModes,
)
from .entity import NikoHomeControlEntity
async def async_setup_entry(
hass: HomeAssistant,
entry: NikoHomeControlConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Niko Home Control thermostat entry."""
controller = entry.runtime_data
async_add_entities(
NikoHomeControlClimate(thermostat, controller, entry.entry_id)
for thermostat in controller.thermostats.values()
)
class NikoHomeControlClimate(NikoHomeControlEntity, ClimateEntity):
"""Representation of a Niko Home Control thermostat."""
_attr_supported_features: ClimateEntityFeature = (
ClimateEntityFeature.PRESET_MODE
| ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.TURN_OFF
)
_attr_temperature_unit = UnitOfTemperature.CELSIUS
_attr_name = None
_action: NHCThermostat
_attr_translation_key = "nhc_thermostat"
_attr_hvac_modes = [HVACMode.OFF, HVACMode.COOL, HVACMode.AUTO]
_attr_preset_modes = [
"day",
"night",
PRESET_ECO,
"prog1",
"prog2",
"prog3",
]
def _get_niko_mode(self, mode: str) -> int:
"""Return the Niko mode."""
return THERMOSTAT_MODES_REVERSE.get(mode, NikoHomeControlThermostatModes.OFF)
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
if ATTR_TEMPERATURE in kwargs:
await self._action.set_temperature(kwargs.get(ATTR_TEMPERATURE))
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set new preset mode."""
await self._action.set_mode(self._get_niko_mode(preset_mode))
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new target hvac mode."""
await self._action.set_mode(NIKO_HOME_CONTROL_THERMOSTAT_MODES_MAP[hvac_mode])
async def async_turn_off(self) -> None:
"""Turn thermostat off."""
await self._action.set_mode(NikoHomeControlThermostatModes.OFF)
def update_state(self) -> None:
"""Update the state of the entity."""
if self._action.state == NikoHomeControlThermostatModes.OFF:
self._attr_hvac_mode = HVACMode.OFF
self._attr_preset_mode = None
elif self._action.state == NikoHomeControlThermostatModes.COOL:
self._attr_hvac_mode = HVACMode.COOL
self._attr_preset_mode = None
else:
self._attr_hvac_mode = HVACMode.AUTO
self._attr_preset_mode = THERMOSTAT_MODES[self._action.state]
self._attr_target_temperature = self._action.setpoint
self._attr_current_temperature = self._action.measured

View File

@@ -1,23 +1,6 @@
"""Constants for niko_home_control integration."""
from enum import IntEnum
import logging
from homeassistant.components.climate import HVACMode
DOMAIN = "niko_home_control"
_LOGGER = logging.getLogger(__name__)
NIKO_HOME_CONTROL_THERMOSTAT_MODES_MAP = {
HVACMode.OFF: 3,
HVACMode.COOL: 4,
HVACMode.AUTO: 5,
}
class NikoHomeControlThermostatModes(IntEnum):
"""Enum for Niko Home Control thermostat modes."""
OFF = 3
COOL = 4
AUTO = 5

View File

@@ -1,20 +0,0 @@
{
"entity": {
"climate": {
"nhc_thermostat": {
"state_attributes": {
"preset_mode": {
"default": "mdi:calendar-clock",
"state": {
"day": "mdi:weather-sunny",
"night": "mdi:weather-night",
"prog1": "mdi:numeric-1",
"prog2": "mdi:numeric-2",
"prog3": "mdi:numeric-3"
}
}
}
}
}
}
}

View File

@@ -26,23 +26,5 @@
"description": "Set up your Niko Home Control instance."
}
}
},
"entity": {
"climate": {
"nhc_thermostat": {
"state_attributes": {
"preset_mode": {
"state": {
"day": "Day",
"eco": "Eco",
"night": "Night",
"prog1": "Program 1",
"prog2": "Program 2",
"prog3": "Program 3"
}
}
}
}
}
}
}

View File

@@ -256,7 +256,6 @@ class PlaystationNetworkFriendDataCoordinator(
account_id=self.user.account_id,
presence=self.user.get_presence(),
profile=self.profile,
trophy_summary=self.user.trophy_summary(),
)
except PSNAWPForbiddenError as error:
raise UpdateFailed(

View File

@@ -54,7 +54,7 @@ class PlaystationNetworkSensor(StrEnum):
NOW_PLAYING = "now_playing"
SENSOR_DESCRIPTIONS: tuple[PlaystationNetworkSensorEntityDescription, ...] = (
SENSOR_DESCRIPTIONS_TROPHY: tuple[PlaystationNetworkSensorEntityDescription, ...] = (
PlaystationNetworkSensorEntityDescription(
key=PlaystationNetworkSensor.TROPHY_LEVEL,
translation_key=PlaystationNetworkSensor.TROPHY_LEVEL,
@@ -106,6 +106,8 @@ SENSOR_DESCRIPTIONS: tuple[PlaystationNetworkSensorEntityDescription, ...] = (
else None
),
),
)
SENSOR_DESCRIPTIONS_USER: tuple[PlaystationNetworkSensorEntityDescription, ...] = (
PlaystationNetworkSensorEntityDescription(
key=PlaystationNetworkSensor.ONLINE_ID,
translation_key=PlaystationNetworkSensor.ONLINE_ID,
@@ -150,7 +152,7 @@ async def async_setup_entry(
coordinator = config_entry.runtime_data.user_data
async_add_entities(
PlaystationNetworkSensorEntity(coordinator, description)
for description in SENSOR_DESCRIPTIONS
for description in SENSOR_DESCRIPTIONS_TROPHY + SENSOR_DESCRIPTIONS_USER
)
for (
@@ -164,7 +166,7 @@ async def async_setup_entry(
description,
config_entry.subentries[subentry_id],
)
for description in SENSOR_DESCRIPTIONS
for description in SENSOR_DESCRIPTIONS_USER
],
config_subentry_id=subentry_id,
)

View File

@@ -57,14 +57,12 @@ type SelectType = Literal[
"select_gateway_mode",
"select_regulation_mode",
"select_schedule",
"select_zone_profile",
]
type SelectOptionsType = Literal[
"available_schedules",
"dhw_modes",
"gateway_modes",
"regulation_modes",
"zone_profiles",
"available_schedules",
]
# Default directives
@@ -84,10 +82,3 @@ MASTER_THERMOSTATS: Final[list[str]] = [
"zone_thermometer",
"zone_thermostat",
]
# Select constants
SELECT_DHW_MODE: Final = "select_dhw_mode"
SELECT_GATEWAY_MODE: Final = "select_gateway_mode"
SELECT_REGULATION_MODE: Final = "select_regulation_mode"
SELECT_SCHEDULE: Final = "select_schedule"
SELECT_ZONE_PROFILE: Final = "select_zone_profile"

View File

@@ -8,6 +8,6 @@
"iot_class": "local_polling",
"loggers": ["plugwise"],
"quality_scale": "platinum",
"requirements": ["plugwise==1.10.0"],
"requirements": ["plugwise==1.9.0"],
"zeroconf": ["_plugwise._tcp.local."]
}

View File

@@ -9,15 +9,7 @@ from homeassistant.const import STATE_ON, EntityCategory
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import (
SELECT_DHW_MODE,
SELECT_GATEWAY_MODE,
SELECT_REGULATION_MODE,
SELECT_SCHEDULE,
SELECT_ZONE_PROFILE,
SelectOptionsType,
SelectType,
)
from .const import SelectOptionsType, SelectType
from .coordinator import PlugwiseConfigEntry, PlugwiseDataUpdateCoordinator
from .entity import PlugwiseEntity
from .util import plugwise_command
@@ -35,34 +27,28 @@ class PlugwiseSelectEntityDescription(SelectEntityDescription):
SELECT_TYPES = (
PlugwiseSelectEntityDescription(
key=SELECT_SCHEDULE,
translation_key=SELECT_SCHEDULE,
key="select_schedule",
translation_key="select_schedule",
options_key="available_schedules",
),
PlugwiseSelectEntityDescription(
key=SELECT_REGULATION_MODE,
translation_key=SELECT_REGULATION_MODE,
key="select_regulation_mode",
translation_key="regulation_mode",
entity_category=EntityCategory.CONFIG,
options_key="regulation_modes",
),
PlugwiseSelectEntityDescription(
key=SELECT_DHW_MODE,
translation_key=SELECT_DHW_MODE,
key="select_dhw_mode",
translation_key="dhw_mode",
entity_category=EntityCategory.CONFIG,
options_key="dhw_modes",
),
PlugwiseSelectEntityDescription(
key=SELECT_GATEWAY_MODE,
translation_key=SELECT_GATEWAY_MODE,
key="select_gateway_mode",
translation_key="gateway_mode",
entity_category=EntityCategory.CONFIG,
options_key="gateway_modes",
),
PlugwiseSelectEntityDescription(
key=SELECT_ZONE_PROFILE,
translation_key=SELECT_ZONE_PROFILE,
entity_category=EntityCategory.CONFIG,
options_key="zone_profiles",
),
)

View File

@@ -109,7 +109,7 @@
}
},
"select": {
"select_dhw_mode": {
"dhw_mode": {
"name": "DHW mode",
"state": {
"auto": "[%key:common::state::auto%]",
@@ -118,7 +118,7 @@
"off": "[%key:common::state::off%]"
}
},
"select_gateway_mode": {
"gateway_mode": {
"name": "Gateway mode",
"state": {
"away": "Pause",
@@ -126,7 +126,7 @@
"vacation": "Vacation"
}
},
"select_regulation_mode": {
"regulation_mode": {
"name": "Regulation mode",
"state": {
"bleeding_cold": "Bleeding cold",
@@ -141,14 +141,6 @@
"state": {
"off": "[%key:common::state::off%]"
}
},
"select_zone_profile": {
"name": "Zone profile",
"state": {
"active": "[%key:common::state::active%]",
"off": "[%key:common::state::off%]",
"passive": "Passive"
}
}
},
"sensor": {

View File

@@ -26,9 +26,6 @@ def validate_db_schema(instance: Recorder) -> set[str]:
schema_errors |= validate_table_schema_supports_utf8(
instance, StatisticsMeta, (StatisticsMeta.statistic_id,)
)
schema_errors |= validate_table_schema_has_correct_collation(
instance, StatisticsMeta
)
for table in (Statistics, StatisticsShortTerm):
schema_errors |= validate_db_schema_precision(instance, table)
schema_errors |= validate_table_schema_has_correct_collation(instance, table)

View File

@@ -54,7 +54,7 @@ CONTEXT_ID_AS_BINARY_SCHEMA_VERSION = 36
EVENT_TYPE_IDS_SCHEMA_VERSION = 37
STATES_META_SCHEMA_VERSION = 38
CIRCULAR_MEAN_SCHEMA_VERSION = 49
UNIT_CLASS_SCHEMA_VERSION = 52
UNIT_CLASS_SCHEMA_VERSION = 51
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION = 28
LEGACY_STATES_EVENT_FOREIGN_KEYS_FIXED_SCHEMA_VERSION = 43

View File

@@ -71,7 +71,7 @@ class LegacyBase(DeclarativeBase):
"""Base class for tables, used for schema migration."""
SCHEMA_VERSION = 52
SCHEMA_VERSION = 51
_LOGGER = logging.getLogger(__name__)

View File

@@ -13,15 +13,7 @@ from typing import TYPE_CHECKING, Any, TypedDict, cast, final
from uuid import UUID
import sqlalchemy
from sqlalchemy import (
ForeignKeyConstraint,
MetaData,
Table,
cast as cast_,
func,
text,
update,
)
from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text, update
from sqlalchemy.engine import CursorResult, Engine
from sqlalchemy.exc import (
DatabaseError,
@@ -34,9 +26,8 @@ from sqlalchemy.exc import (
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.session import Session
from sqlalchemy.schema import AddConstraint, CreateTable, DropConstraint
from sqlalchemy.sql.expression import and_, true
from sqlalchemy.sql.expression import true
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.types import BINARY
from homeassistant.core import HomeAssistant
from homeassistant.util.enum import try_parse_enum
@@ -2053,74 +2044,14 @@ class _SchemaVersion50Migrator(_SchemaVersionMigrator, target_version=50):
class _SchemaVersion51Migrator(_SchemaVersionMigrator, target_version=51):
def _apply_update(self) -> None:
"""Version specific update method."""
# Replaced with version 52 which corrects issues with MySQL string comparisons.
class _SchemaVersion52Migrator(_SchemaVersionMigrator, target_version=52):
def _apply_update(self) -> None:
"""Version specific update method."""
if self.engine.dialect.name == SupportedDialect.MYSQL:
self._apply_update_mysql()
else:
self._apply_update_postgresql_sqlite()
def _apply_update_mysql(self) -> None:
"""Version specific update method for mysql."""
# Add unit class column to StatisticsMeta
_add_columns(self.session_maker, "statistics_meta", ["unit_class VARCHAR(255)"])
with session_scope(session=self.session_maker()) as session:
connection = session.connection()
for conv in _PRIMARY_UNIT_CONVERTERS:
case_sensitive_units = {
u.encode("utf-8") if u else u for u in conv.VALID_UNITS
}
# Reset unit_class to None for entries that do not match
# the valid units (case sensitive) but matched before due to
# case insensitive comparisons.
connection.execute(
update(StatisticsMeta)
.where(
and_(
StatisticsMeta.unit_of_measurement.in_(conv.VALID_UNITS),
cast_(StatisticsMeta.unit_of_measurement, BINARY).not_in(
case_sensitive_units
),
)
)
.values(unit_class=None)
)
# Do an explicitly case sensitive match (actually binary) to set the
# correct unit_class. This is needed because we use the case sensitive
# utf8mb4_unicode_ci collation.
connection.execute(
update(StatisticsMeta)
.where(
and_(
cast_(StatisticsMeta.unit_of_measurement, BINARY).in_(
case_sensitive_units
),
StatisticsMeta.unit_class.is_(None),
)
)
.values(unit_class=conv.UNIT_CLASS)
)
def _apply_update_postgresql_sqlite(self) -> None:
"""Version specific update method for postgresql and sqlite."""
_add_columns(self.session_maker, "statistics_meta", ["unit_class VARCHAR(255)"])
with session_scope(session=self.session_maker()) as session:
connection = session.connection()
for conv in _PRIMARY_UNIT_CONVERTERS:
# Set the correct unit_class. Unlike MySQL, Postgres and SQLite
# have case sensitive string comparisons by default, so we
# can directly match on the valid units.
connection.execute(
update(StatisticsMeta)
.where(
and_(
StatisticsMeta.unit_of_measurement.in_(conv.VALID_UNITS),
StatisticsMeta.unit_class.is_(None),
)
)
.where(StatisticsMeta.unit_of_measurement.in_(conv.VALID_UNITS))
.values(unit_class=conv.UNIT_CLASS)
)

View File

@@ -26,7 +26,7 @@ CACHE_SIZE = 8192
_LOGGER = logging.getLogger(__name__)
QUERY_STATISTICS_META = (
QUERY_STATISTIC_META = (
StatisticsMeta.id,
StatisticsMeta.statistic_id,
StatisticsMeta.source,
@@ -55,7 +55,7 @@ def _generate_get_metadata_stmt(
Depending on the schema version, either mean_type (added in version 49) or has_mean column is used.
"""
columns: list[InstrumentedAttribute[Any]] = list(QUERY_STATISTICS_META)
columns: list[InstrumentedAttribute[Any]] = list(QUERY_STATISTIC_META)
if schema_version >= CIRCULAR_MEAN_SCHEMA_VERSION:
columns.append(StatisticsMeta.mean_type)
else:

View File

@@ -12,11 +12,10 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, httpx_client
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
from homeassistant.helpers import (
config_entry_oauth2_flow,
config_validation as cv,
httpx_client,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
@@ -29,21 +28,19 @@ _LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
PLATFORMS = [Platform.CLIMATE, Platform.SENSOR]
PLATFORMS = [Platform.CLIMATE]
type SENZDataUpdateCoordinator = DataUpdateCoordinator[dict[str, Thermostat]]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up SENZ from a config entry."""
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
session = OAuth2Session(hass, entry, implementation)
implementation = (
await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
)
session = config_entry_oauth2_flow.OAuth2Session(hass, entry, implementation)
auth = SENZConfigEntryAuth(httpx_client.get_async_client(hass), session)
senz_api = SENZAPI(auth)

View File

@@ -35,7 +35,7 @@ async def async_setup_entry(
)
class SENZClimate(CoordinatorEntity[SENZDataUpdateCoordinator], ClimateEntity):
class SENZClimate(CoordinatorEntity, ClimateEntity):
"""Representation of a SENZ climate entity."""
_attr_temperature_unit = UnitOfTemperature.CELSIUS

View File

@@ -1,29 +0,0 @@
"""Diagnostics platform for Senz integration."""
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN
TO_REDACT = [
"access_token",
"refresh_token",
]
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
raw_data = (
[device.raw_data for device in hass.data[DOMAIN][entry.entry_id].data.values()],
)
return {
"entry_data": async_redact_data(entry.data, TO_REDACT),
"thermostats": raw_data,
}

View File

@@ -1,93 +0,0 @@
"""nVent RAYCHEM SENZ sensor platform."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from aiosenz import Thermostat
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import SENZDataUpdateCoordinator
from .const import DOMAIN
@dataclass(kw_only=True, frozen=True)
class SenzSensorDescription(SensorEntityDescription):
"""Describes SENZ sensor entity."""
value_fn: Callable[[Thermostat], str | int | float | None]
SENSORS: tuple[SenzSensorDescription, ...] = (
SenzSensorDescription(
key="temperature",
device_class=SensorDeviceClass.TEMPERATURE,
value_fn=lambda data: data.current_temperatue,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=1,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the SENZ sensor entities from a config entry."""
coordinator: SENZDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
async_add_entities(
SENZSensor(thermostat, coordinator, description)
for description in SENSORS
for thermostat in coordinator.data.values()
)
class SENZSensor(CoordinatorEntity[SENZDataUpdateCoordinator], SensorEntity):
"""Representation of a SENZ sensor entity."""
entity_description: SenzSensorDescription
_attr_has_entity_name = True
def __init__(
self,
thermostat: Thermostat,
coordinator: SENZDataUpdateCoordinator,
description: SenzSensorDescription,
) -> None:
"""Init SENZ sensor."""
super().__init__(coordinator)
self.entity_description = description
self._thermostat = thermostat
self._attr_unique_id = f"{thermostat.serial_number}_{description.key}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, thermostat.serial_number)},
manufacturer="nVent Raychem",
model="SENZ WIFI",
name=thermostat.name,
serial_number=thermostat.serial_number,
)
@property
def available(self) -> bool:
"""Return True if the thermostat is available."""
return super().available and self._thermostat.online
@property
def native_value(self) -> str | float | int | None:
"""Return the state of the sensor."""
return self.entity_description.value_fn(self._thermostat)

View File

@@ -25,10 +25,5 @@
"title": "[%key:common::config_flow::title::oauth2_pick_implementation%]"
}
}
},
"exceptions": {
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
}
}
}

View File

@@ -12,7 +12,6 @@ from aioshelly.exceptions import DeviceConnectionError, InvalidAuthError
from homeassistant.components.number import (
DOMAIN as NUMBER_PLATFORM,
NumberDeviceClass,
NumberEntity,
NumberEntityDescription,
NumberExtraStoredData,
@@ -108,9 +107,6 @@ class RpcNumber(ShellyRpcAttributeEntity, NumberEntity):
if description.mode_fn is not None:
self._attr_mode = description.mode_fn(coordinator.device.config[key])
if hasattr(self, "_attr_name") and description.role != ROLE_GENERIC:
delattr(self, "_attr_name")
@property
def native_value(self) -> float | None:
"""Return value of number."""
@@ -185,6 +181,7 @@ NUMBERS: dict[tuple[str, str], BlockNumberDescription] = {
("device", "valvePos"): BlockNumberDescription(
key="device|valvepos",
translation_key="valve_position",
name="Valve position",
native_unit_of_measurement=PERCENTAGE,
available=lambda block: cast(int, block.valveError) != 1,
entity_category=EntityCategory.CONFIG,
@@ -203,12 +200,12 @@ RPC_NUMBERS: Final = {
key="blutrv",
sub_key="current_C",
translation_key="external_temperature",
name="External temperature",
native_min_value=-50,
native_max_value=50,
native_step=0.1,
mode=NumberMode.BOX,
entity_category=EntityCategory.CONFIG,
device_class=NumberDeviceClass.TEMPERATURE,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
method="blu_trv_set_external_temperature",
entity_class=RpcBluTrvExtTempNumber,
@@ -216,7 +213,7 @@ RPC_NUMBERS: Final = {
"number_generic": RpcNumberDescription(
key="number",
sub_key="value",
removal_condition=lambda config, _, key: not is_view_for_platform(
removal_condition=lambda config, _status, key: not is_view_for_platform(
config, key, NUMBER_PLATFORM
),
max_fn=lambda config: config["max"],
@@ -232,11 +229,9 @@ RPC_NUMBERS: Final = {
"number_current_limit": RpcNumberDescription(
key="number",
sub_key="value",
translation_key="current_limit",
device_class=NumberDeviceClass.CURRENT,
max_fn=lambda config: config["max"],
min_fn=lambda config: config["min"],
mode_fn=lambda _: NumberMode.SLIDER,
mode_fn=lambda config: NumberMode.SLIDER,
step_fn=lambda config: config["meta"]["ui"].get("step"),
unit=get_virtual_component_unit,
method="number_set",
@@ -246,11 +241,10 @@ RPC_NUMBERS: Final = {
"number_position": RpcNumberDescription(
key="number",
sub_key="value",
translation_key="valve_position",
entity_registry_enabled_default=False,
max_fn=lambda config: config["max"],
min_fn=lambda config: config["min"],
mode_fn=lambda _: NumberMode.SLIDER,
mode_fn=lambda config: NumberMode.SLIDER,
step_fn=lambda config: config["meta"]["ui"].get("step"),
unit=get_virtual_component_unit,
method="number_set",
@@ -260,12 +254,10 @@ RPC_NUMBERS: Final = {
"number_target_humidity": RpcNumberDescription(
key="number",
sub_key="value",
translation_key="target_humidity",
device_class=NumberDeviceClass.HUMIDITY,
entity_registry_enabled_default=False,
max_fn=lambda config: config["max"],
min_fn=lambda config: config["min"],
mode_fn=lambda _: NumberMode.SLIDER,
mode_fn=lambda config: NumberMode.SLIDER,
step_fn=lambda config: config["meta"]["ui"].get("step"),
unit=get_virtual_component_unit,
method="number_set",
@@ -275,12 +267,10 @@ RPC_NUMBERS: Final = {
"number_target_temperature": RpcNumberDescription(
key="number",
sub_key="value",
translation_key="target_temperature",
device_class=NumberDeviceClass.TEMPERATURE,
entity_registry_enabled_default=False,
max_fn=lambda config: config["max"],
min_fn=lambda config: config["min"],
mode_fn=lambda _: NumberMode.SLIDER,
mode_fn=lambda config: NumberMode.SLIDER,
step_fn=lambda config: config["meta"]["ui"].get("step"),
unit=get_virtual_component_unit,
method="number_set",
@@ -291,20 +281,21 @@ RPC_NUMBERS: Final = {
key="blutrv",
sub_key="pos",
translation_key="valve_position",
name="Valve position",
native_min_value=0,
native_max_value=100,
native_step=1,
mode=NumberMode.SLIDER,
native_unit_of_measurement=PERCENTAGE,
method="blu_trv_set_valve_position",
removal_condition=lambda config, _, key: config[key].get("enable", True)
removal_condition=lambda config, _status, key: config[key].get("enable", True)
is True,
entity_class=RpcBluTrvNumber,
),
"left_slot_intensity": RpcNumberDescription(
key="cury",
sub_key="slots",
translation_key="left_slot_intensity",
name="Left slot intensity",
value=lambda status, _: status["left"]["intensity"],
native_min_value=0,
native_max_value=100,
@@ -320,7 +311,7 @@ RPC_NUMBERS: Final = {
"right_slot_intensity": RpcNumberDescription(
key="cury",
sub_key="slots",
translation_key="right_slot_intensity",
name="Right slot intensity",
value=lambda status, _: status["right"]["intensity"],
native_min_value=0,
native_max_value=100,
@@ -411,9 +402,6 @@ class BlockSleepingNumber(ShellySleepingBlockAttributeEntity, RestoreNumber):
self.restored_data: NumberExtraStoredData | None = None
super().__init__(coordinator, block, attribute, description, entry)
if hasattr(self, "_attr_name"):
delattr(self, "_attr_name")
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
await super().async_added_to_hass()

View File

@@ -188,29 +188,6 @@
}
}
},
"number": {
"current_limit": {
"name": "Current limit"
},
"external_temperature": {
"name": "External temperature"
},
"left_slot_intensity": {
"name": "Left slot intensity"
},
"right_slot_intensity": {
"name": "Right slot intensity"
},
"target_humidity": {
"name": "Target humidity"
},
"target_temperature": {
"name": "Target temperature"
},
"valve_position": {
"name": "Valve position"
}
},
"select": {
"cury_mode": {
"name": "Mode",

View File

@@ -75,7 +75,6 @@ PLATFORMS_BY_TYPE = {
SupportedModels.HUBMINI_MATTER.value: [Platform.SENSOR],
SupportedModels.CIRCULATOR_FAN.value: [Platform.FAN, Platform.SENSOR],
SupportedModels.S10_VACUUM.value: [Platform.VACUUM, Platform.SENSOR],
SupportedModels.S20_VACUUM.value: [Platform.VACUUM, Platform.SENSOR],
SupportedModels.K10_VACUUM.value: [Platform.VACUUM, Platform.SENSOR],
SupportedModels.K10_PRO_VACUUM.value: [Platform.VACUUM, Platform.SENSOR],
SupportedModels.K10_PRO_COMBO_VACUUM.value: [Platform.VACUUM, Platform.SENSOR],
@@ -103,10 +102,6 @@ PLATFORMS_BY_TYPE = {
SupportedModels.RELAY_SWITCH_2PM.value: [Platform.SWITCH, Platform.SENSOR],
SupportedModels.GARAGE_DOOR_OPENER.value: [Platform.COVER, Platform.SENSOR],
SupportedModels.CLIMATE_PANEL.value: [Platform.SENSOR, Platform.BINARY_SENSOR],
SupportedModels.SMART_THERMOSTAT_RADIATOR.value: [
Platform.CLIMATE,
Platform.SENSOR,
],
}
CLASS_BY_DEVICE = {
SupportedModels.CEILING_LIGHT.value: switchbot.SwitchbotCeilingLight,
@@ -124,7 +119,6 @@ CLASS_BY_DEVICE = {
SupportedModels.ROLLER_SHADE.value: switchbot.SwitchbotRollerShade,
SupportedModels.CIRCULATOR_FAN.value: switchbot.SwitchbotFan,
SupportedModels.S10_VACUUM.value: switchbot.SwitchbotVacuum,
SupportedModels.S20_VACUUM.value: switchbot.SwitchbotVacuum,
SupportedModels.K10_VACUUM.value: switchbot.SwitchbotVacuum,
SupportedModels.K10_PRO_VACUUM.value: switchbot.SwitchbotVacuum,
SupportedModels.K10_PRO_COMBO_VACUUM.value: switchbot.SwitchbotVacuum,
@@ -142,7 +136,6 @@ CLASS_BY_DEVICE = {
SupportedModels.PLUG_MINI_EU.value: switchbot.SwitchbotRelaySwitch,
SupportedModels.RELAY_SWITCH_2PM.value: switchbot.SwitchbotRelaySwitch2PM,
SupportedModels.GARAGE_DOOR_OPENER.value: switchbot.SwitchbotGarageDoorOpener,
SupportedModels.SMART_THERMOSTAT_RADIATOR.value: switchbot.SwitchbotSmartThermostatRadiator,
}

View File

@@ -1,140 +0,0 @@
"""Support for Switchbot Climate devices."""
from __future__ import annotations
import logging
from typing import Any
import switchbot
from switchbot import (
ClimateAction as SwitchBotClimateAction,
ClimateMode as SwitchBotClimateMode,
)
from homeassistant.components.climate import (
ClimateEntity,
ClimateEntityFeature,
HVACAction,
HVACMode,
)
from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import SwitchbotConfigEntry
from .entity import SwitchbotEntity, exception_handler
SWITCHBOT_CLIMATE_TO_HASS_HVAC_MODE = {
SwitchBotClimateMode.HEAT: HVACMode.HEAT,
SwitchBotClimateMode.OFF: HVACMode.OFF,
}
HASS_HVAC_MODE_TO_SWITCHBOT_CLIMATE = {
HVACMode.HEAT: SwitchBotClimateMode.HEAT,
HVACMode.OFF: SwitchBotClimateMode.OFF,
}
SWITCHBOT_ACTION_TO_HASS_HVAC_ACTION = {
SwitchBotClimateAction.HEATING: HVACAction.HEATING,
SwitchBotClimateAction.IDLE: HVACAction.IDLE,
SwitchBotClimateAction.OFF: HVACAction.OFF,
}
_LOGGER = logging.getLogger(__name__)
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,
entry: SwitchbotConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Switchbot climate based on a config entry."""
coordinator = entry.runtime_data
async_add_entities([SwitchBotClimateEntity(coordinator)])
class SwitchBotClimateEntity(SwitchbotEntity, ClimateEntity):
"""Representation of a Switchbot Climate device."""
_device: switchbot.SwitchbotDevice
_attr_supported_features = (
ClimateEntityFeature.PRESET_MODE
| ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.TURN_OFF
| ClimateEntityFeature.TURN_ON
)
_attr_target_temperature_step = 0.5
_attr_temperature_unit = UnitOfTemperature.CELSIUS
_attr_translation_key = "climate"
_attr_name = None
@property
def min_temp(self) -> float:
"""Return the minimum temperature."""
return self._device.min_temperature
@property
def max_temp(self) -> float:
"""Return the maximum temperature."""
return self._device.max_temperature
@property
def preset_modes(self) -> list[str] | None:
"""Return the list of available preset modes."""
return self._device.preset_modes
@property
def preset_mode(self) -> str | None:
"""Return the current preset mode."""
return self._device.preset_mode
@property
def hvac_mode(self) -> HVACMode | None:
"""Return the current HVAC mode."""
return SWITCHBOT_CLIMATE_TO_HASS_HVAC_MODE.get(
self._device.hvac_mode, HVACMode.OFF
)
@property
def hvac_modes(self) -> list[HVACMode]:
"""Return the list of available HVAC modes."""
return [
SWITCHBOT_CLIMATE_TO_HASS_HVAC_MODE[mode]
for mode in self._device.hvac_modes
]
@property
def hvac_action(self) -> HVACAction | None:
"""Return the current HVAC action."""
return SWITCHBOT_ACTION_TO_HASS_HVAC_ACTION.get(
self._device.hvac_action, HVACAction.OFF
)
@property
def current_temperature(self) -> float | None:
"""Return the current temperature."""
return self._device.current_temperature
@property
def target_temperature(self) -> float | None:
"""Return the temperature we try to reach."""
return self._device.target_temperature
@exception_handler
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new HVAC mode."""
return await self._device.set_hvac_mode(
HASS_HVAC_MODE_TO_SWITCHBOT_CLIMATE[hvac_mode]
)
@exception_handler
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set new preset mode."""
return await self._device.set_preset_mode(preset_mode)
@exception_handler
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
temperature = kwargs.get(ATTR_TEMPERATURE)
return await self._device.set_target_temperature(temperature)

View File

@@ -58,8 +58,6 @@ class SupportedModels(StrEnum):
K11_PLUS_VACUUM = "k11+_vacuum"
GARAGE_DOOR_OPENER = "garage_door_opener"
CLIMATE_PANEL = "climate_panel"
SMART_THERMOSTAT_RADIATOR = "smart_thermostat_radiator"
S20_VACUUM = "s20_vacuum"
CONNECTABLE_SUPPORTED_MODEL_TYPES = {
@@ -80,7 +78,6 @@ CONNECTABLE_SUPPORTED_MODEL_TYPES = {
SwitchbotModel.CIRCULATOR_FAN: SupportedModels.CIRCULATOR_FAN,
SwitchbotModel.K20_VACUUM: SupportedModels.K20_VACUUM,
SwitchbotModel.S10_VACUUM: SupportedModels.S10_VACUUM,
SwitchbotModel.S20_VACUUM: SupportedModels.S20_VACUUM,
SwitchbotModel.K10_VACUUM: SupportedModels.K10_VACUUM,
SwitchbotModel.K10_PRO_VACUUM: SupportedModels.K10_PRO_VACUUM,
SwitchbotModel.K10_PRO_COMBO_VACUUM: SupportedModels.K10_PRO_COMBO_VACUUM,
@@ -98,7 +95,6 @@ CONNECTABLE_SUPPORTED_MODEL_TYPES = {
SwitchbotModel.K11_VACUUM: SupportedModels.K11_PLUS_VACUUM,
SwitchbotModel.GARAGE_DOOR_OPENER: SupportedModels.GARAGE_DOOR_OPENER,
SwitchbotModel.CLIMATE_PANEL: SupportedModels.CLIMATE_PANEL,
SwitchbotModel.SMART_THERMOSTAT_RADIATOR: SupportedModels.SMART_THERMOSTAT_RADIATOR,
}
NON_CONNECTABLE_SUPPORTED_MODEL_TYPES = {
@@ -136,7 +132,6 @@ ENCRYPTED_MODELS = {
SwitchbotModel.PLUG_MINI_EU,
SwitchbotModel.RELAY_SWITCH_2PM,
SwitchbotModel.GARAGE_DOOR_OPENER,
SwitchbotModel.SMART_THERMOSTAT_RADIATOR,
}
ENCRYPTED_SWITCHBOT_MODEL_TO_CLASS: dict[
@@ -158,7 +153,6 @@ ENCRYPTED_SWITCHBOT_MODEL_TO_CLASS: dict[
SwitchbotModel.PLUG_MINI_EU: switchbot.SwitchbotRelaySwitch,
SwitchbotModel.RELAY_SWITCH_2PM: switchbot.SwitchbotRelaySwitch2PM,
SwitchbotModel.GARAGE_DOOR_OPENER: switchbot.SwitchbotRelaySwitch,
SwitchbotModel.SMART_THERMOSTAT_RADIATOR: switchbot.SwitchbotSmartThermostatRadiator,
}
HASS_SENSOR_TYPE_TO_SWITCHBOT_MODEL = {

View File

@@ -1,18 +1,5 @@
{
"entity": {
"climate": {
"climate": {
"state_attributes": {
"preset_mode": {
"state": {
"manual": "mdi:hand-back-right",
"off": "mdi:hvac-off",
"schedule": "mdi:calendar-clock"
}
}
}
}
},
"fan": {
"air_purifier": {
"default": "mdi:air-purifier",

View File

@@ -41,5 +41,5 @@
"iot_class": "local_push",
"loggers": ["switchbot"],
"quality_scale": "gold",
"requirements": ["PySwitchbot==0.73.0"]
"requirements": ["PySwitchbot==0.72.1"]
}

View File

@@ -100,19 +100,6 @@
"name": "Unlocked alarm"
}
},
"climate": {
"climate": {
"state_attributes": {
"preset_mode": {
"state": {
"manual": "[%key:common::state::manual%]",
"off": "[%key:common::state::off%]",
"schedule": "Schedule"
}
}
}
}
},
"cover": {
"cover": {
"state_attributes": {

View File

@@ -84,7 +84,6 @@
"abort": {
"already_configured": "Chat already configured"
},
"entry_type": "Allowed chat ID",
"error": {
"chat_not_found": "Chat not found"
},

View File

@@ -23,7 +23,6 @@ from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
)
@@ -62,11 +61,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslaFleetConfigEntry) -
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
except ValueError as e:
# Remove invalid implementation from config entry then raise AuthFailed
hass.config_entries.async_update_entry(

View File

@@ -609,9 +609,6 @@
"no_cable": {
"message": "Charge cable will lock automatically when connected"
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
},
"update_failed": {
"message": "{endpoint} data request failed: {message}"
}

View File

@@ -11,10 +11,8 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.config_entry_oauth2_flow import (
ImplementationUnavailableError,
OAuth2Session,
async_get_config_entry_implementation,
)
@@ -88,13 +86,7 @@ async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Toon from a config entry."""
try:
implementation = await async_get_config_entry_implementation(hass, entry)
except ImplementationUnavailableError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="oauth2_implementation_unavailable",
) from err
implementation = await async_get_config_entry_implementation(hass, entry)
session = OAuth2Session(hass, entry, implementation)
coordinator = ToonDataUpdateCoordinator(hass, entry, session)

View File

@@ -32,11 +32,6 @@
}
}
},
"exceptions": {
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
}
},
"services": {
"update": {
"description": "Updates all entities with fresh data from Toon.",

View File

@@ -181,14 +181,15 @@ class TPLinkClimateEntity(CoordinatedTPLinkModuleEntity, ClimateEntity):
HVACMode.HEAT if self._thermostat_module.state else HVACMode.OFF
)
if self._thermostat_module.mode not in STATE_TO_ACTION:
# Report a warning on the first non-default unknown mode
if self._attr_hvac_action is not HVACAction.OFF:
_LOGGER.warning(
"Unknown thermostat state, defaulting to OFF: %s",
self._thermostat_module.mode,
)
self._attr_hvac_action = HVACAction.OFF
if (
self._thermostat_module.mode not in STATE_TO_ACTION
and self._attr_hvac_action is not HVACAction.OFF
):
_LOGGER.warning(
"Unknown thermostat state, defaulting to OFF: %s",
self._thermostat_module.mode,
)
self._attr_hvac_action = HVACAction.OFF
return True
self._attr_hvac_action = STATE_TO_ACTION[self._thermostat_module.mode]

View File

@@ -2,14 +2,13 @@
from functools import partial
import logging
from typing import cast
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_ID
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, selector
from .const import (
@@ -24,7 +23,7 @@ from .const import (
SERVICE_START_TORRENT,
SERVICE_STOP_TORRENT,
)
from .coordinator import TransmissionDataUpdateCoordinator
from .coordinator import TransmissionConfigEntry, TransmissionDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
@@ -68,52 +67,45 @@ SERVICE_STOP_TORRENT_SCHEMA = vol.All(
def _get_coordinator_from_service_data(
call: ServiceCall,
hass: HomeAssistant, entry_id: str
) -> TransmissionDataUpdateCoordinator:
"""Return coordinator for entry id."""
config_entry_id: str = call.data[CONF_ENTRY_ID]
if not (entry := call.hass.config_entries.async_get_entry(config_entry_id)):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="integration_not_found",
translation_placeholders={"target": DOMAIN},
)
if entry.state is not ConfigEntryState.LOADED:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="not_loaded",
translation_placeholders={"target": entry.title},
)
return cast(TransmissionDataUpdateCoordinator, entry.runtime_data)
entry: TransmissionConfigEntry | None = hass.config_entries.async_get_entry(
entry_id
)
if entry is None or entry.state is not ConfigEntryState.LOADED:
raise HomeAssistantError(f"Config entry {entry_id} is not found or not loaded")
return entry.runtime_data
async def _async_add_torrent(service: ServiceCall) -> None:
"""Add new torrent to download."""
coordinator = _get_coordinator_from_service_data(service)
entry_id: str = service.data[CONF_ENTRY_ID]
coordinator = _get_coordinator_from_service_data(service.hass, entry_id)
torrent: str = service.data[ATTR_TORRENT]
download_path: str | None = service.data.get(ATTR_DOWNLOAD_PATH)
if not (
torrent.startswith(("http", "ftp:", "magnet:"))
or service.hass.config.is_allowed_path(torrent)
):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="could_not_add_torrent",
)
if download_path:
await service.hass.async_add_executor_job(
partial(coordinator.api.add_torrent, torrent, download_dir=download_path)
)
if torrent.startswith(
("http", "ftp:", "magnet:")
) or service.hass.config.is_allowed_path(torrent):
if download_path:
await service.hass.async_add_executor_job(
partial(
coordinator.api.add_torrent, torrent, download_dir=download_path
)
)
else:
await service.hass.async_add_executor_job(
coordinator.api.add_torrent, torrent
)
await coordinator.async_request_refresh()
else:
await service.hass.async_add_executor_job(coordinator.api.add_torrent, torrent)
await coordinator.async_request_refresh()
_LOGGER.warning("Could not add torrent: unsupported type or no permission")
async def _async_start_torrent(service: ServiceCall) -> None:
"""Start torrent."""
coordinator = _get_coordinator_from_service_data(service)
entry_id: str = service.data[CONF_ENTRY_ID]
coordinator = _get_coordinator_from_service_data(service.hass, entry_id)
torrent_id = service.data[CONF_ID]
await service.hass.async_add_executor_job(coordinator.api.start_torrent, torrent_id)
await coordinator.async_request_refresh()
@@ -121,7 +113,8 @@ async def _async_start_torrent(service: ServiceCall) -> None:
async def _async_stop_torrent(service: ServiceCall) -> None:
"""Stop torrent."""
coordinator = _get_coordinator_from_service_data(service)
entry_id: str = service.data[CONF_ENTRY_ID]
coordinator = _get_coordinator_from_service_data(service.hass, entry_id)
torrent_id = service.data[CONF_ID]
await service.hass.async_add_executor_job(coordinator.api.stop_torrent, torrent_id)
await coordinator.async_request_refresh()
@@ -129,7 +122,8 @@ async def _async_stop_torrent(service: ServiceCall) -> None:
async def _async_remove_torrent(service: ServiceCall) -> None:
"""Remove torrent."""
coordinator = _get_coordinator_from_service_data(service)
entry_id: str = service.data[CONF_ENTRY_ID]
coordinator = _get_coordinator_from_service_data(service.hass, entry_id)
torrent_id = service.data[CONF_ID]
delete_data = service.data[ATTR_DELETE_DATA]
await service.hass.async_add_executor_job(

View File

@@ -1,7 +1,6 @@
add_torrent:
fields:
entry_id:
required: true
selector:
config_entry:
integration: transmission
@@ -19,7 +18,6 @@ add_torrent:
remove_torrent:
fields:
entry_id:
required: true
selector:
config_entry:
integration: transmission
@@ -29,7 +27,6 @@ remove_torrent:
selector:
text:
delete_data:
required: true
default: false
selector:
boolean:
@@ -37,12 +34,10 @@ remove_torrent:
start_torrent:
fields:
entry_id:
required: true
selector:
config_entry:
integration: transmission
id:
required: true
example: 123
selector:
text:
@@ -50,7 +45,6 @@ start_torrent:
stop_torrent:
fields:
entry_id:
required: true
selector:
config_entry:
integration: transmission

View File

@@ -87,17 +87,6 @@
}
}
},
"exceptions": {
"could_not_add_torrent": {
"message": "Could not add torrent: unsupported type or no permission."
},
"integration_not_found": {
"message": "Integration \"{target}\" not found in registry."
},
"not_loaded": {
"message": "{target} is not loaded."
}
},
"options": {
"step": {
"init": {

View File

@@ -19,9 +19,9 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import TuyaConfigEntry
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode, DPType
from .entity import TuyaEntity
from .models import DPCodeEnumWrapper
from .models import EnumTypeData, find_dpcode
from .util import get_dpcode
@@ -85,21 +85,9 @@ async def async_setup_entry(
device = manager.device_map[device_id]
if descriptions := ALARM.get(device.category):
entities.extend(
TuyaAlarmEntity(
device,
manager,
description,
action_dpcode_wrapper=action_dpcode_wrapper,
state_dpcode_wrapper=DPCodeEnumWrapper.find_dpcode(
device, description.master_state
),
)
TuyaAlarmEntity(device, manager, description)
for description in descriptions
if (
action_dpcode_wrapper := DPCodeEnumWrapper.find_dpcode(
device, description.key, prefer_function=True
)
)
if description.key in device.status
)
async_add_entities(entities)
@@ -115,6 +103,7 @@ class TuyaAlarmEntity(TuyaEntity, AlarmControlPanelEntity):
_attr_name = None
_attr_code_arm_required = False
_master_state: EnumTypeData | None = None
_alarm_msg_dpcode: DPCode | None = None
def __init__(
@@ -122,24 +111,33 @@ class TuyaAlarmEntity(TuyaEntity, AlarmControlPanelEntity):
device: CustomerDevice,
device_manager: Manager,
description: TuyaAlarmControlPanelEntityDescription,
*,
action_dpcode_wrapper: DPCodeEnumWrapper,
state_dpcode_wrapper: DPCodeEnumWrapper | None,
) -> None:
"""Init Tuya Alarm."""
super().__init__(device, device_manager)
self.entity_description = description
self._attr_unique_id = f"{super().unique_id}{description.key}"
self._action_dpcode_wrapper = action_dpcode_wrapper
self._state_dpcode_wrapper = state_dpcode_wrapper
# Determine supported modes
if Mode.HOME in action_dpcode_wrapper.type_information.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.ARM_HOME
if Mode.ARM in action_dpcode_wrapper.type_information.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.ARM_AWAY
if Mode.SOS in action_dpcode_wrapper.type_information.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.TRIGGER
if supported_modes := find_dpcode(
self.device, description.key, dptype=DPType.ENUM, prefer_function=True
):
if Mode.HOME in supported_modes.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.ARM_HOME
if Mode.ARM in supported_modes.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.ARM_AWAY
if Mode.SOS in supported_modes.range:
self._attr_supported_features |= AlarmControlPanelEntityFeature.TRIGGER
# Determine master state
if enum_type := find_dpcode(
self.device,
description.master_state,
dptype=DPType.ENUM,
prefer_function=True,
):
self._master_state = enum_type
# Determine alarm message
if dp_code := get_dpcode(self.device, description.alarm_msg):
@@ -151,8 +149,8 @@ class TuyaAlarmEntity(TuyaEntity, AlarmControlPanelEntity):
# When the alarm is triggered, only its 'state' is changing. From 'normal' to 'alarm'.
# The 'mode' doesn't change, and stays as 'arm' or 'home'.
if (
self._state_dpcode_wrapper is not None
and self.device.status.get(self._state_dpcode_wrapper.dpcode) == State.ALARM
self._master_state is not None
and self.device.status.get(self._master_state.dpcode) == State.ALARM
):
# Only report as triggered if NOT a battery warning
if (
@@ -168,26 +166,28 @@ class TuyaAlarmEntity(TuyaEntity, AlarmControlPanelEntity):
def changed_by(self) -> str | None:
"""Last change triggered by."""
if (
self._state_dpcode_wrapper is not None
self._master_state is not None
and self._alarm_msg_dpcode is not None
and self.device.status.get(self._state_dpcode_wrapper.dpcode) == State.ALARM
and self.device.status.get(self._master_state.dpcode) == State.ALARM
and (encoded_msg := self.device.status.get(self._alarm_msg_dpcode))
):
return b64decode(encoded_msg).decode("utf-16be")
return None
async def async_alarm_disarm(self, code: str | None = None) -> None:
def alarm_disarm(self, code: str | None = None) -> None:
"""Send Disarm command."""
await self._async_send_dpcode_update(self._action_dpcode_wrapper, Mode.DISARMED)
self._send_command(
[{"code": self.entity_description.key, "value": Mode.DISARMED}]
)
async def async_alarm_arm_home(self, code: str | None = None) -> None:
def alarm_arm_home(self, code: str | None = None) -> None:
"""Send Home command."""
await self._async_send_dpcode_update(self._action_dpcode_wrapper, Mode.HOME)
self._send_command([{"code": self.entity_description.key, "value": Mode.HOME}])
async def async_alarm_arm_away(self, code: str | None = None) -> None:
def alarm_arm_away(self, code: str | None = None) -> None:
"""Send Arm command."""
await self._async_send_dpcode_update(self._action_dpcode_wrapper, Mode.ARM)
self._send_command([{"code": self.entity_description.key, "value": Mode.ARM}])
async def async_alarm_trigger(self, code: str | None = None) -> None:
def alarm_trigger(self, code: str | None = None) -> None:
"""Send SOS command."""
await self._async_send_dpcode_update(self._action_dpcode_wrapper, Mode.SOS)
self._send_command([{"code": self.entity_description.key, "value": Mode.SOS}])

View File

@@ -704,7 +704,6 @@ class DPCode(StrEnum):
DECIBEL_SWITCH = "decibel_switch"
DEHUMIDITY_SET_ENUM = "dehumidify_set_enum"
DEHUMIDITY_SET_VALUE = "dehumidify_set_value"
DELAY_CLEAN_TIME = "delay_clean_time"
DELAY_SET = "delay_set"
DEW_POINT_TEMP = "dew_point_temp"
DISINFECTION = "disinfection"

View File

@@ -6,14 +6,13 @@ from abc import ABC, abstractmethod
import base64
from dataclasses import dataclass
import json
import struct
from typing import Any, Literal, Self, overload
from tuya_sharing import CustomerDevice
from homeassistant.util.json import json_loads
from .const import DPCode, DPType
from .util import parse_dptype, remap_value
from .util import remap_value
@dataclass
@@ -135,8 +134,6 @@ _TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = {
DPType.BOOLEAN: TypeInformation,
DPType.ENUM: EnumTypeData,
DPType.INTEGER: IntegerTypeData,
DPType.JSON: TypeInformation,
DPType.RAW: TypeInformation,
}
@@ -147,9 +144,6 @@ class DPCodeWrapper(ABC):
access read conversion routines.
"""
native_unit: str | None = None
suggested_unit: str | None = None
def __init__(self, dpcode: str) -> None:
"""Init DPCodeWrapper."""
self.dpcode = dpcode
@@ -202,7 +196,7 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
def find_dpcode(
cls,
device: CustomerDevice,
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
dpcodes: str | DPCode | tuple[DPCode, ...],
*,
prefer_function: bool = False,
) -> Self | None:
@@ -216,20 +210,6 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
return None
class DPCodeBase64Wrapper(DPCodeTypeInformationWrapper[TypeInformation]):
"""Wrapper to extract information from a RAW/binary value."""
DPTYPE = DPType.RAW
def read_bytes(self, device: CustomerDevice) -> bytes | None:
"""Read the device value for the dpcode."""
if (raw_value := self._read_device_status_raw(device)) is None or (
len(decoded := base64.b64decode(raw_value)) == 0
):
return None
return decoded
class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
"""Simple wrapper for boolean values.
@@ -255,18 +235,6 @@ class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
raise ValueError(f"Invalid boolean value `{value}`")
class DPCodeJsonWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
"""Wrapper to extract information from a JSON value."""
DPTYPE = DPType.JSON
def read_json(self, device: CustomerDevice) -> Any | None:
"""Read the device value for the dpcode."""
if (raw_value := self._read_device_status_raw(device)) is None:
return None
return json_loads(raw_value)
class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeData]):
"""Simple wrapper for EnumTypeData values."""
@@ -300,11 +268,6 @@ class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeData]):
DPTYPE = DPType.INTEGER
def __init__(self, dpcode: str, type_information: IntegerTypeData) -> None:
"""Init DPCodeIntegerWrapper."""
super().__init__(dpcode, type_information)
self.native_unit = type_information.unit
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode.
@@ -389,16 +352,6 @@ def find_dpcode(
) -> IntegerTypeData | None: ...
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.BOOLEAN, DPType.JSON, DPType.RAW],
) -> TypeInformation | None: ...
def find_dpcode(
device: CustomerDevice,
dpcodes: str | DPCode | tuple[DPCode, ...] | None,
@@ -428,7 +381,7 @@ def find_dpcode(
for device_specs in lookup_tuple:
if (
(current_definition := device_specs.get(dpcode))
and parse_dptype(current_definition.type) is dptype
and current_definition.type == dptype
and (
type_information := type_information_cls.from_json(
dpcode, current_definition.values
@@ -438,3 +391,44 @@ def find_dpcode(
return type_information
return None
class ComplexValue:
"""Complex value (for JSON/RAW parsing)."""
@classmethod
def from_json(cls, data: str) -> Self:
"""Load JSON string and return a ComplexValue object."""
raise NotImplementedError("from_json is not implemented for this type")
@classmethod
def from_raw(cls, data: str) -> Self | None:
"""Decode base64 string and return a ComplexValue object."""
raise NotImplementedError("from_raw is not implemented for this type")
@dataclass
class ElectricityValue(ComplexValue):
"""Electricity complex value."""
electriccurrent: str | None = None
power: str | None = None
voltage: str | None = None
@classmethod
def from_json(cls, data: str) -> Self:
"""Load JSON string and return a ElectricityValue object."""
return cls(**json.loads(data.lower()))
@classmethod
def from_raw(cls, data: str) -> Self | None:
"""Decode base64 string and return a ElectricityValue object."""
raw = base64.b64decode(data)
if len(raw) == 0:
return None
voltage = struct.unpack(">H", raw[0:2])[0] / 10.0
electriccurrent = struct.unpack(">L", b"\x00" + raw[2:5])[0] / 1000.0
power = struct.unpack(">L", b"\x00" + raw[5:8])[0] / 1000.0
return cls(
electriccurrent=str(electriccurrent), power=str(power), voltage=str(voltage)
)

View File

@@ -180,14 +180,6 @@ NUMBERS: dict[DeviceCategory, tuple[NumberEntityDescription, ...]] = {
entity_category=EntityCategory.CONFIG,
),
),
DeviceCategory.MSP: (
NumberEntityDescription(
key=DPCode.DELAY_CLEAN_TIME,
translation_key="delay_clean_time",
device_class=NumberDeviceClass.DURATION,
entity_category=EntityCategory.CONFIG,
),
),
DeviceCategory.MZJ: (
NumberEntityDescription(
key=DPCode.COOK_TEMPERATURE,
@@ -502,19 +494,14 @@ class TuyaNumberEntity(TuyaEntity, NumberEntity):
self._attr_native_min_value = dpcode_wrapper.type_information.min_scaled
self._attr_native_step = dpcode_wrapper.type_information.step_scaled
if description.native_unit_of_measurement is None:
self._attr_native_unit_of_measurement = dpcode_wrapper.native_unit
self._validate_device_class_unit()
def _validate_device_class_unit(self) -> None:
"""Validate device class unit compatibility."""
self._attr_native_unit_of_measurement = dpcode_wrapper.type_information.unit
# Logic to ensure the set device class and API received Unit Of Measurement
# match Home Assistants requirements.
if (
self.device_class is not None
and not self.device_class.startswith(DOMAIN)
and self.entity_description.native_unit_of_measurement is None
and description.native_unit_of_measurement is None
# we do not need to check mappings if the API UOM is allowed
and self.native_unit_of_measurement
not in NUMBER_DEVICE_CLASS_UNITS[self.device_class]

View File

@@ -2,8 +2,9 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
import struct
from typing import Any
from tuya_sharing import CustomerDevice, Manager
@@ -41,134 +42,41 @@ from .const import (
)
from .entity import TuyaEntity
from .models import (
DPCodeBase64Wrapper,
DPCodeEnumWrapper,
DPCodeIntegerWrapper,
DPCodeJsonWrapper,
DPCodeTypeInformationWrapper,
DPCodeWrapper,
ComplexValue,
ElectricityValue,
EnumTypeData,
IntegerTypeData,
find_dpcode,
)
from .util import get_dptype
class _WindDirectionWrapper(DPCodeTypeInformationWrapper[EnumTypeData]):
"""Custom DPCode Wrapper for converting enum to wind direction."""
DPTYPE = DPType.ENUM
_WIND_DIRECTIONS = {
"north": 0.0,
"north_north_east": 22.5,
"north_east": 45.0,
"east_north_east": 67.5,
"east": 90.0,
"east_south_east": 112.5,
"south_east": 135.0,
"south_south_east": 157.5,
"south": 180.0,
"south_south_west": 202.5,
"south_west": 225.0,
"west_south_west": 247.5,
"west": 270.0,
"west_north_west": 292.5,
"north_west": 315.0,
"north_north_west": 337.5,
}
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (
raw_value := self._read_device_status_raw(device)
) in self.type_information.range:
return self._WIND_DIRECTIONS.get(raw_value)
return None
class _JsonElectricityCurrentWrapper(DPCodeJsonWrapper):
"""Custom DPCode Wrapper for extracting electricity current from JSON."""
native_unit = UnitOfElectricCurrent.AMPERE
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_json(device)) is None:
return None
return raw_value.get("electricCurrent")
class _JsonElectricityPowerWrapper(DPCodeJsonWrapper):
"""Custom DPCode Wrapper for extracting electricity power from JSON."""
native_unit = UnitOfPower.KILO_WATT
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_json(device)) is None:
return None
return raw_value.get("power")
class _JsonElectricityVoltageWrapper(DPCodeJsonWrapper):
"""Custom DPCode Wrapper for extracting electricity voltage from JSON."""
native_unit = UnitOfElectricPotential.VOLT
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_json(device)) is None:
return None
return raw_value.get("voltage")
class _RawElectricityCurrentWrapper(DPCodeBase64Wrapper):
"""Custom DPCode Wrapper for extracting electricity current from base64."""
native_unit = UnitOfElectricCurrent.MILLIAMPERE
suggested_unit = UnitOfElectricCurrent.AMPERE
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_bytes(device)) is None:
return None
return struct.unpack(">L", b"\x00" + raw_value[2:5])[0]
class _RawElectricityPowerWrapper(DPCodeBase64Wrapper):
"""Custom DPCode Wrapper for extracting electricity power from base64."""
native_unit = UnitOfPower.WATT
suggested_unit = UnitOfPower.KILO_WATT
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_bytes(device)) is None:
return None
return struct.unpack(">L", b"\x00" + raw_value[5:8])[0]
class _RawElectricityVoltageWrapper(DPCodeBase64Wrapper):
"""Custom DPCode Wrapper for extracting electricity voltage from base64."""
native_unit = UnitOfElectricPotential.VOLT
def read_device_status(self, device: CustomerDevice) -> float | None:
"""Read the device value for the dpcode."""
if (raw_value := super().read_bytes(device)) is None:
return None
return struct.unpack(">H", raw_value[0:2])[0] / 10.0
CURRENT_WRAPPER = (_RawElectricityCurrentWrapper, _JsonElectricityCurrentWrapper)
POWER_WRAPPER = (_RawElectricityPowerWrapper, _JsonElectricityPowerWrapper)
VOLTAGE_WRAPPER = (_RawElectricityVoltageWrapper, _JsonElectricityVoltageWrapper)
_WIND_DIRECTIONS = {
"north": 0.0,
"north_north_east": 22.5,
"north_east": 45.0,
"east_north_east": 67.5,
"east": 90.0,
"east_south_east": 112.5,
"south_east": 135.0,
"south_south_east": 157.5,
"south": 180.0,
"south_south_west": 202.5,
"south_west": 225.0,
"west_south_west": 247.5,
"west": 270.0,
"west_north_west": 292.5,
"north_west": 315.0,
"north_north_west": 337.5,
}
@dataclass(frozen=True)
class TuyaSensorEntityDescription(SensorEntityDescription):
"""Describes Tuya sensor entity."""
dpcode: DPCode | None = None
wrapper_class: tuple[type[DPCodeTypeInformationWrapper], ...] | None = None
complex_type: type[ComplexValue] | None = None
subkey: str | None = None
state_conversion: Callable[[Any], StateType] | None = None
# Commonly used battery sensors, that are reused in the sensors down below.
@@ -486,76 +394,85 @@ SENSORS: dict[DeviceCategory, tuple[TuyaSensorEntityDescription, ...]] = {
state_class=SensorStateClass.MEASUREMENT,
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}electriccurrent",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}power",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}voltage",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}electriccurrent",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}power",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}voltage",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}electriccurrent",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}power",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}voltage",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
TuyaSensorEntityDescription(
key=DPCode.CUR_CURRENT,
@@ -1055,7 +972,7 @@ SENSORS: dict[DeviceCategory, tuple[TuyaSensorEntityDescription, ...]] = {
translation_key="wind_direction",
device_class=SensorDeviceClass.WIND_DIRECTION,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=(_WindDirectionWrapper,),
state_conversion=lambda state: _WIND_DIRECTIONS.get(str(state)),
),
TuyaSensorEntityDescription(
key=DPCode.DEW_POINT_TEMP,
@@ -1568,11 +1485,12 @@ SENSORS: dict[DeviceCategory, tuple[TuyaSensorEntityDescription, ...]] = {
state_class=SensorStateClass.MEASUREMENT,
),
TuyaSensorEntityDescription(
key=f"{DPCode.TOTAL_POWER}power",
dpcode=DPCode.TOTAL_POWER,
key=DPCode.TOTAL_POWER,
translation_key="total_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=DPCode.SUPPLY_FREQUENCY,
@@ -1582,76 +1500,85 @@ SENSORS: dict[DeviceCategory, tuple[TuyaSensorEntityDescription, ...]] = {
state_class=SensorStateClass.MEASUREMENT,
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}electriccurrent",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}power",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_A}voltage",
dpcode=DPCode.PHASE_A,
key=DPCode.PHASE_A,
translation_key="phase_a_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}electriccurrent",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}power",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_B}voltage",
dpcode=DPCode.PHASE_B,
key=DPCode.PHASE_B,
translation_key="phase_b_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}electriccurrent",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_current",
device_class=SensorDeviceClass.CURRENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=CURRENT_WRAPPER,
complex_type=ElectricityValue,
subkey="electriccurrent",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}power",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=POWER_WRAPPER,
native_unit_of_measurement=UnitOfPower.KILO_WATT,
complex_type=ElectricityValue,
subkey="power",
),
TuyaSensorEntityDescription(
key=f"{DPCode.PHASE_C}voltage",
dpcode=DPCode.PHASE_C,
key=DPCode.PHASE_C,
translation_key="phase_c_voltage",
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
wrapper_class=VOLTAGE_WRAPPER,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
complex_type=ElectricityValue,
subkey="voltage",
),
),
DeviceCategory.ZNNBQ: (
@@ -1712,27 +1639,6 @@ SENSORS[DeviceCategory.DGHSXJ] = SENSORS[DeviceCategory.SP]
SENSORS[DeviceCategory.PC] = SENSORS[DeviceCategory.KG]
def _get_dpcode_wrapper(
device: CustomerDevice,
description: TuyaSensorEntityDescription,
) -> DPCodeWrapper | None:
"""Get DPCode wrapper for an entity description."""
dpcode = description.dpcode or description.key
wrapper: DPCodeWrapper | None
if description.wrapper_class:
for cls in description.wrapper_class:
if wrapper := cls.find_dpcode(device, dpcode):
return wrapper
return None
for cls in (DPCodeIntegerWrapper, DPCodeEnumWrapper):
if wrapper := cls.find_dpcode(device, dpcode):
return wrapper
return None
async def async_setup_entry(
hass: HomeAssistant,
entry: TuyaConfigEntry,
@@ -1749,9 +1655,9 @@ async def async_setup_entry(
device = manager.device_map[device_id]
if descriptions := SENSORS.get(device.category):
entities.extend(
TuyaSensorEntity(device, manager, description, dpcode_wrapper)
TuyaSensorEntity(device, manager, description)
for description in descriptions
if (dpcode_wrapper := _get_dpcode_wrapper(device, description))
if description.key in device.status
)
async_add_entities(entities)
@@ -1767,25 +1673,35 @@ class TuyaSensorEntity(TuyaEntity, SensorEntity):
"""Tuya Sensor Entity."""
entity_description: TuyaSensorEntityDescription
_dpcode_wrapper: DPCodeWrapper
_type: DPType | None = None
_type_data: IntegerTypeData | EnumTypeData | None = None
def __init__(
self,
device: CustomerDevice,
device_manager: Manager,
description: TuyaSensorEntityDescription,
dpcode_wrapper: DPCodeWrapper,
) -> None:
"""Init Tuya sensor."""
super().__init__(device, device_manager)
self.entity_description = description
self._attr_unique_id = f"{super().unique_id}{description.key}"
self._dpcode_wrapper = dpcode_wrapper
self._attr_unique_id = (
f"{super().unique_id}{description.key}{description.subkey or ''}"
)
if description.native_unit_of_measurement is None:
self._attr_native_unit_of_measurement = dpcode_wrapper.native_unit
if description.suggested_unit_of_measurement is None:
self._attr_suggested_unit_of_measurement = dpcode_wrapper.suggested_unit
if int_type := find_dpcode(self.device, description.key, dptype=DPType.INTEGER):
self._type_data = int_type
self._type = DPType.INTEGER
if description.native_unit_of_measurement is None:
self._attr_native_unit_of_measurement = int_type.unit
elif enum_type := find_dpcode(
self.device, description.key, dptype=DPType.ENUM, prefer_function=True
):
self._type_data = enum_type
self._type = DPType.ENUM
else:
self._type = get_dptype(self.device, DPCode(description.key))
self._validate_device_class_unit()
@@ -1836,4 +1752,55 @@ class TuyaSensorEntity(TuyaEntity, SensorEntity):
@property
def native_value(self) -> StateType:
"""Return the value reported by the sensor."""
return self._dpcode_wrapper.read_device_status(self.device)
# Only continue if data type is known
if self._type not in (
DPType.INTEGER,
DPType.STRING,
DPType.ENUM,
DPType.JSON,
DPType.RAW,
):
return None
# Raw value
value = self.device.status.get(self.entity_description.key)
if value is None:
return None
# Convert value, if required
if (convert := self.entity_description.state_conversion) is not None:
return convert(value)
# Scale integer/float value
if isinstance(self._type_data, IntegerTypeData):
return self._type_data.scale_value(value)
# Unexpected enum value
if (
isinstance(self._type_data, EnumTypeData)
and value not in self._type_data.range
):
return None
# Get subkey value from Json string.
if self._type is DPType.JSON:
if (
self.entity_description.complex_type is None
or self.entity_description.subkey is None
):
return None
values = self.entity_description.complex_type.from_json(value)
return getattr(values, self.entity_description.subkey)
if self._type is DPType.RAW:
if (
self.entity_description.complex_type is None
or self.entity_description.subkey is None
or (raw_values := self.entity_description.complex_type.from_raw(value))
is None
):
return None
return getattr(raw_values, self.entity_description.subkey)
# Valid string or enum value
return value

View File

@@ -19,7 +19,6 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import TuyaConfigEntry
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode
from .entity import TuyaEntity
from .models import DPCodeBooleanWrapper
SIRENS: dict[DeviceCategory, tuple[SirenEntityDescription, ...]] = {
DeviceCategory.CO2BJ: (
@@ -65,13 +64,9 @@ async def async_setup_entry(
device = manager.device_map[device_id]
if descriptions := SIRENS.get(device.category):
entities.extend(
TuyaSirenEntity(device, manager, description, dpcode_wrapper)
TuyaSirenEntity(device, manager, description)
for description in descriptions
if (
dpcode_wrapper := DPCodeBooleanWrapper.find_dpcode(
device, description.key, prefer_function=True
)
)
if description.key in device.status
)
async_add_entities(entities)
@@ -94,23 +89,21 @@ class TuyaSirenEntity(TuyaEntity, SirenEntity):
device: CustomerDevice,
device_manager: Manager,
description: SirenEntityDescription,
dpcode_wrapper: DPCodeBooleanWrapper,
) -> None:
"""Init Tuya Siren."""
super().__init__(device, device_manager)
self.entity_description = description
self._attr_unique_id = f"{super().unique_id}{description.key}"
self._dpcode_wrapper = dpcode_wrapper
@property
def is_on(self) -> bool | None:
def is_on(self) -> bool:
"""Return true if siren is on."""
return self._dpcode_wrapper.read_device_status(self.device)
return self.device.status.get(self.entity_description.key, False)
async def async_turn_on(self, **kwargs: Any) -> None:
def turn_on(self, **kwargs: Any) -> None:
"""Turn the siren on."""
await self._async_send_dpcode_update(self._dpcode_wrapper, True)
self._send_command([{"code": self.entity_description.key, "value": True}])
async def async_turn_off(self, **kwargs: Any) -> None:
def turn_off(self, **kwargs: Any) -> None:
"""Turn the siren off."""
await self._async_send_dpcode_update(self._dpcode_wrapper, False)
self._send_command([{"code": self.entity_description.key, "value": False}])

View File

@@ -172,9 +172,6 @@
"cook_time": {
"name": "Cooking time"
},
"delay_clean_time": {
"name": "Delay clean time"
},
"down_delay": {
"name": "Down delay"
},

Some files were not shown because too many files have changed in this diff Show More