Compare commits

..

10 Commits

Author SHA1 Message Date
mib1185
ee0230f3b1 use renamed helpers 2025-12-17 20:06:03 +00:00
mib1185
851fd467fe Merge branch 'dev' into input_boolean/add-domain-driven-triggers 2025-12-17 20:05:20 +00:00
hanwg
c418d9750b Remove ALLOW_EXTRA from Telegram bot action schema (#158886) 2025-12-17 19:49:34 +01:00
Joost Lekkerkerker
e96d614076 Add integration_type service to meteo_france (#159315) 2025-12-17 19:19:14 +01:00
Abílio Costa
f0a5e0a023 Enable duplicated log file on supervised when env var is set (#158679) 2025-12-17 17:44:54 +00:00
Klaas Schoute
6ac6b86060 Set quality scale in Autarco manifest (#159263) 2025-12-17 16:17:19 +01:00
PaulCavill
3909171b1a Login exception reason (#159259) 2025-12-17 16:13:54 +01:00
Luke Lashley
769029505f Bump python-roborock to 3.18.0 (#159271) 2025-12-17 06:39:06 -08:00
Paul Tarjan
080ec3524b Fix flaky camera stream teardown (#158507)
Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>
2025-12-17 13:47:22 +01:00
mib1185
d10148a175 add turned_off and turned_on triggers 2025-12-12 20:53:03 +00:00
23 changed files with 501 additions and 244 deletions

View File

@@ -624,13 +624,16 @@ async def async_enable_logging(
if log_file is None:
default_log_path = hass.config.path(ERROR_LOG_FILENAME)
if "SUPERVISOR" in os.environ:
_LOGGER.info("Running in Supervisor, not logging to file")
if "SUPERVISOR" in os.environ and "HA_DUPLICATE_LOG_FILE" not in os.environ:
# Rename the default log file if it exists, since previous versions created
# it even on Supervisor
if os.path.isfile(default_log_path):
with contextlib.suppress(OSError):
os.rename(default_log_path, f"{default_log_path}.old")
def rename_old_file() -> None:
"""Rename old log file in executor."""
if os.path.isfile(default_log_path):
with contextlib.suppress(OSError):
os.rename(default_log_path, f"{default_log_path}.old")
await hass.async_add_executor_job(rename_old_file)
err_log_path = None
else:
err_log_path = default_log_path

View File

@@ -6,5 +6,6 @@
"documentation": "https://www.home-assistant.io/integrations/autarco",
"integration_type": "hub",
"iot_class": "cloud_polling",
"quality_scale": "silver",
"requirements": ["autarco==3.2.0"]
}

View File

@@ -130,6 +130,7 @@ _EXPERIMENTAL_TRIGGER_PLATFORMS = {
"cover",
"device_tracker",
"fan",
"input_boolean",
"lawn_mower",
"light",
"media_player",

View File

@@ -108,7 +108,7 @@ class IcloudAccount:
if self.api.requires_2fa:
# Trigger a new log in to ensure the user enters the 2FA code again.
raise PyiCloudFailedLoginException # noqa: TRY301
raise PyiCloudFailedLoginException("2FA Required") # noqa: TRY301
except PyiCloudFailedLoginException:
self.api = None

View File

@@ -20,5 +20,13 @@
"turn_on": {
"service": "mdi:toggle-switch"
}
},
"triggers": {
"turned_off": {
"trigger": "mdi:toggle-switch-off"
},
"turned_on": {
"trigger": "mdi:toggle-switch"
}
}
}

View File

@@ -1,4 +1,8 @@
{
"common": {
"trigger_behavior_description": "The behavior of the targeted input booleans to trigger on.",
"trigger_behavior_name": "Behavior"
},
"entity_component": {
"_": {
"name": "[%key:component::input_boolean::title%]",
@@ -17,6 +21,15 @@
}
}
},
"selector": {
"trigger_behavior": {
"options": {
"any": "Any",
"first": "First",
"last": "Last"
}
}
},
"services": {
"reload": {
"description": "Reloads helpers from the YAML-configuration.",
@@ -35,5 +48,27 @@
"name": "[%key:common::action::turn_on%]"
}
},
"title": "Input boolean"
"title": "Input boolean",
"triggers": {
"turned_off": {
"description": "Triggers after one or more input booleans turn off.",
"fields": {
"behavior": {
"description": "[%key:component::input_boolean::common::trigger_behavior_description%]",
"name": "[%key:component::input_boolean::common::trigger_behavior_name%]"
}
},
"name": "Input boolean turned off"
},
"turned_on": {
"description": "Triggers after one or more input booleans turn on.",
"fields": {
"behavior": {
"description": "[%key:component::input_boolean::common::trigger_behavior_description%]",
"name": "[%key:component::input_boolean::common::trigger_behavior_name%]"
}
},
"name": "Input boolean turned on"
}
}
}

View File

@@ -0,0 +1,17 @@
"""Provides triggers for input booleans."""
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.trigger import Trigger, make_entity_target_state_trigger
from . import DOMAIN
TRIGGERS: dict[str, type[Trigger]] = {
"turned_on": make_entity_target_state_trigger(DOMAIN, STATE_ON),
"turned_off": make_entity_target_state_trigger(DOMAIN, STATE_OFF),
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for input booleans."""
return TRIGGERS

View File

@@ -0,0 +1,18 @@
.trigger_common: &trigger_common
target:
entity:
domain: input_boolean
fields:
behavior:
required: true
default: any
selector:
select:
options:
- first
- last
- any
translation_key: trigger_behavior
turned_off: *trigger_common
turned_on: *trigger_common

View File

@@ -1,9 +1,10 @@
{
"domain": "meteo_france",
"name": "M\u00e9t\u00e9o-France",
"name": "Météo-France",
"codeowners": ["@hacf-fr", "@oncleben31", "@Quentame"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/meteo_france",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["meteofrance_api"],
"requirements": ["meteofrance-api==1.4.0"]

View File

@@ -20,7 +20,7 @@
"loggers": ["roborock"],
"quality_scale": "silver",
"requirements": [
"python-roborock==3.12.2",
"python-roborock==3.18.0",
"vacuum-map-parser-roborock==0.1.4"
]
}

View File

@@ -63,6 +63,7 @@ from .const import (
ATTR_PASSWORD,
ATTR_QUESTION,
ATTR_REACTION,
ATTR_REPLY_TO_MSGID,
ATTR_RESIZE_KEYBOARD,
ATTR_SHOW_ALERT,
ATTR_STICKER_ID,
@@ -126,21 +127,26 @@ BASE_SERVICE_SCHEMA = vol.Schema(
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
vol.Optional(ATTR_MESSAGE_TAG): cv.string,
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
},
extra=vol.ALLOW_EXTRA,
}
)
SERVICE_SCHEMA_SEND_MESSAGE = vol.All(
cv.deprecated(ATTR_TIMEOUT),
BASE_SERVICE_SCHEMA.extend(
{vol.Required(ATTR_MESSAGE): cv.string, vol.Optional(ATTR_TITLE): cv.string}
{
vol.Required(ATTR_MESSAGE): cv.string,
vol.Optional(ATTR_TITLE): cv.string,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
),
)
SERVICE_SCHEMA_SEND_CHAT_ACTION = vol.All(
cv.deprecated(ATTR_TIMEOUT),
BASE_SERVICE_SCHEMA.extend(
vol.Schema(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]),
vol.Required(ATTR_CHAT_ACTION): vol.In(
(
CHAT_ACTION_TYPING,
@@ -156,6 +162,7 @@ SERVICE_SCHEMA_SEND_CHAT_ACTION = vol.All(
CHAT_ACTION_UPLOAD_VIDEO_NOTE,
)
),
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
}
),
)
@@ -169,6 +176,7 @@ SERVICE_SCHEMA_BASE_SEND_FILE = BASE_SERVICE_SCHEMA.extend(
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
)
@@ -188,6 +196,7 @@ SERVICE_SCHEMA_SEND_LOCATION = vol.All(
{
vol.Required(ATTR_LONGITUDE): cv.string,
vol.Required(ATTR_LATITUDE): cv.string,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
),
)
@@ -205,18 +214,25 @@ SERVICE_SCHEMA_SEND_POLL = vol.All(
vol.Optional(ATTR_ALLOWS_MULTIPLE_ANSWERS, default=False): cv.boolean,
vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean,
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
),
)
SERVICE_SCHEMA_EDIT_MESSAGE = vol.All(
cv.deprecated(ATTR_TIMEOUT),
SERVICE_SCHEMA_BASE_SEND_FILE.extend(
vol.Schema(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
vol.Optional(ATTR_TITLE): cv.string,
vol.Required(ATTR_MESSAGE): cv.string,
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Optional(ATTR_PARSER): cv.string,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
vol.Optional(ATTR_DISABLE_WEB_PREV): cv.boolean,
}
),
)

View File

@@ -783,6 +783,7 @@ class TelegramNotificationService:
None,
chat_id=chat_id,
action=chat_action,
message_thread_id=kwargs.get(ATTR_MESSAGE_THREAD_ID),
context=context,
)
result[chat_id] = is_successful

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Any, Self
from typing import Any
from tuya_sharing import CustomerDevice, Manager
@@ -18,140 +18,34 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import TuyaConfigEntry
from .const import TUYA_DISCOVERY_NEW, DeviceCategory, DPCode
from .entity import TuyaEntity
from .models import DeviceWrapper, DPCodeBooleanWrapper, DPCodeEnumWrapper
from .models import DPCodeBooleanWrapper, DPCodeEnumWrapper
class _VacuumActivityWrapper(DeviceWrapper):
"""Wrapper for the state of a device."""
_TUYA_STATUS_TO_HA = {
"charge_done": VacuumActivity.DOCKED,
"chargecompleted": VacuumActivity.DOCKED,
"chargego": VacuumActivity.DOCKED,
"charging": VacuumActivity.DOCKED,
"cleaning": VacuumActivity.CLEANING,
"docking": VacuumActivity.RETURNING,
"goto_charge": VacuumActivity.RETURNING,
"goto_pos": VacuumActivity.CLEANING,
"mop_clean": VacuumActivity.CLEANING,
"part_clean": VacuumActivity.CLEANING,
"paused": VacuumActivity.PAUSED,
"pick_zone_clean": VacuumActivity.CLEANING,
"pos_arrived": VacuumActivity.CLEANING,
"pos_unarrive": VacuumActivity.CLEANING,
"random": VacuumActivity.CLEANING,
"sleep": VacuumActivity.IDLE,
"smart_clean": VacuumActivity.CLEANING,
"smart": VacuumActivity.CLEANING,
"spot_clean": VacuumActivity.CLEANING,
"standby": VacuumActivity.IDLE,
"wall_clean": VacuumActivity.CLEANING,
"wall_follow": VacuumActivity.CLEANING,
"zone_clean": VacuumActivity.CLEANING,
}
def __init__(
self,
pause_wrapper: DPCodeBooleanWrapper | None = None,
status_wrapper: DPCodeEnumWrapper | None = None,
) -> None:
"""Init DeviceWrapper."""
self._pause_wrapper = pause_wrapper
self._status_wrapper = status_wrapper
@classmethod
def find_dpcode(cls, device: CustomerDevice) -> Self | None:
"""Find and return a _VacuumActivityWrapper for the given DP codes."""
pause_wrapper = DPCodeBooleanWrapper.find_dpcode(device, DPCode.PAUSE)
status_wrapper = DPCodeEnumWrapper.find_dpcode(device, DPCode.STATUS)
if pause_wrapper or status_wrapper:
return cls(pause_wrapper=pause_wrapper, status_wrapper=status_wrapper)
return None
def read_device_status(self, device: CustomerDevice) -> VacuumActivity | None:
"""Read the device status."""
if (
self._status_wrapper
and (status := self._status_wrapper.read_device_status(device)) is not None
):
return self._TUYA_STATUS_TO_HA.get(status)
if self._pause_wrapper and self._pause_wrapper.read_device_status(device):
return VacuumActivity.PAUSED
return None
class _VacuumActionWrapper(DeviceWrapper):
"""Wrapper for sending actions to a vacuum."""
_TUYA_MODE_RETURN_HOME = "chargego"
def __init__(
self,
charge_wrapper: DPCodeBooleanWrapper | None,
locate_wrapper: DPCodeBooleanWrapper | None,
pause_wrapper: DPCodeBooleanWrapper | None,
mode_wrapper: DPCodeEnumWrapper | None,
switch_wrapper: DPCodeBooleanWrapper | None,
) -> None:
"""Init DeviceWrapper."""
self._charge_wrapper = charge_wrapper
self._locate_wrapper = locate_wrapper
self._mode_wrapper = mode_wrapper
self._switch_wrapper = switch_wrapper
self.options = []
if charge_wrapper or (
mode_wrapper and self._TUYA_MODE_RETURN_HOME in mode_wrapper.options
):
self.options.append("return_to_base")
if locate_wrapper:
self.options.append("locate")
if pause_wrapper:
self.options.append("pause")
if switch_wrapper:
self.options.append("start")
self.options.append("stop")
@classmethod
def find_dpcode(cls, device: CustomerDevice) -> Self:
"""Find and return a DPCodeTypeInformationWrapper for the given DP codes."""
return cls(
charge_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.SWITCH_CHARGE, prefer_function=True
),
locate_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.SEEK, prefer_function=True
),
mode_wrapper=DPCodeEnumWrapper.find_dpcode(
device, DPCode.MODE, prefer_function=True
),
pause_wrapper=DPCodeBooleanWrapper.find_dpcode(device, DPCode.PAUSE),
switch_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.POWER_GO, prefer_function=True
),
)
def get_update_commands(
self, device: CustomerDevice, value: Any
) -> list[dict[str, Any]]:
"""Get the commands for the action wrapper."""
if value == "locate" and self._locate_wrapper:
return self._locate_wrapper.get_update_commands(device, True)
if value == "pause" and self._switch_wrapper:
return self._switch_wrapper.get_update_commands(device, False)
if value == "return_to_base":
if self._charge_wrapper:
return self._charge_wrapper.get_update_commands(device, True)
if self._mode_wrapper:
return self._mode_wrapper.get_update_commands(
device, self._TUYA_MODE_RETURN_HOME
)
if value == "start" and self._switch_wrapper:
return self._switch_wrapper.get_update_commands(device, True)
if value == "stop" and self._switch_wrapper:
return self._switch_wrapper.get_update_commands(device, False)
return []
TUYA_MODE_RETURN_HOME = "chargego"
TUYA_STATUS_TO_HA = {
"charge_done": VacuumActivity.DOCKED,
"chargecompleted": VacuumActivity.DOCKED,
"chargego": VacuumActivity.DOCKED,
"charging": VacuumActivity.DOCKED,
"cleaning": VacuumActivity.CLEANING,
"docking": VacuumActivity.RETURNING,
"goto_charge": VacuumActivity.RETURNING,
"goto_pos": VacuumActivity.CLEANING,
"mop_clean": VacuumActivity.CLEANING,
"part_clean": VacuumActivity.CLEANING,
"paused": VacuumActivity.PAUSED,
"pick_zone_clean": VacuumActivity.CLEANING,
"pos_arrived": VacuumActivity.CLEANING,
"pos_unarrive": VacuumActivity.CLEANING,
"random": VacuumActivity.CLEANING,
"sleep": VacuumActivity.IDLE,
"smart_clean": VacuumActivity.CLEANING,
"smart": VacuumActivity.CLEANING,
"spot_clean": VacuumActivity.CLEANING,
"standby": VacuumActivity.IDLE,
"wall_clean": VacuumActivity.CLEANING,
"wall_follow": VacuumActivity.CLEANING,
"zone_clean": VacuumActivity.CLEANING,
}
async def async_setup_entry(
@@ -173,11 +67,27 @@ async def async_setup_entry(
TuyaVacuumEntity(
device,
manager,
action_wrapper=_VacuumActionWrapper.find_dpcode(device),
activity_wrapper=_VacuumActivityWrapper.find_dpcode(device),
charge_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.SWITCH_CHARGE, prefer_function=True
),
fan_speed_wrapper=DPCodeEnumWrapper.find_dpcode(
device, DPCode.SUCTION, prefer_function=True
),
locate_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.SEEK, prefer_function=True
),
mode_wrapper=DPCodeEnumWrapper.find_dpcode(
device, DPCode.MODE, prefer_function=True
),
pause_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.PAUSE
),
status_wrapper=DPCodeEnumWrapper.find_dpcode(
device, DPCode.STATUS
),
switch_wrapper=DPCodeBooleanWrapper.find_dpcode(
device, DPCode.POWER_GO, prefer_function=True
),
)
)
async_add_entities(entities)
@@ -199,33 +109,43 @@ class TuyaVacuumEntity(TuyaEntity, StateVacuumEntity):
device: CustomerDevice,
device_manager: Manager,
*,
action_wrapper: _VacuumActionWrapper | None,
activity_wrapper: _VacuumActivityWrapper | None,
charge_wrapper: DPCodeBooleanWrapper | None,
fan_speed_wrapper: DPCodeEnumWrapper | None,
locate_wrapper: DPCodeBooleanWrapper | None,
mode_wrapper: DPCodeEnumWrapper | None,
pause_wrapper: DPCodeBooleanWrapper | None,
status_wrapper: DPCodeEnumWrapper | None,
switch_wrapper: DPCodeBooleanWrapper | None,
) -> None:
"""Init Tuya vacuum."""
super().__init__(device, device_manager)
self._action_wrapper = action_wrapper
self._activity_wrapper = activity_wrapper
self._charge_wrapper = charge_wrapper
self._fan_speed_wrapper = fan_speed_wrapper
self._locate_wrapper = locate_wrapper
self._mode_wrapper = mode_wrapper
self._pause_wrapper = pause_wrapper
self._status_wrapper = status_wrapper
self._switch_wrapper = switch_wrapper
self._attr_fan_speed_list = []
self._attr_supported_features = VacuumEntityFeature.SEND_COMMAND
self._attr_supported_features = (
VacuumEntityFeature.SEND_COMMAND | VacuumEntityFeature.STATE
)
if pause_wrapper:
self._attr_supported_features |= VacuumEntityFeature.PAUSE
if action_wrapper and action_wrapper.options:
if "pause" in action_wrapper.options:
self._attr_supported_features |= VacuumEntityFeature.PAUSE
if "return_to_base" in action_wrapper.options:
self._attr_supported_features |= VacuumEntityFeature.RETURN_HOME
if "locate" in action_wrapper.options:
self._attr_supported_features |= VacuumEntityFeature.LOCATE
if "start" in action_wrapper.options:
self._attr_supported_features |= VacuumEntityFeature.START
if "stop" in action_wrapper.options:
self._attr_supported_features |= VacuumEntityFeature.STOP
if charge_wrapper or (
mode_wrapper and TUYA_MODE_RETURN_HOME in mode_wrapper.options
):
self._attr_supported_features |= VacuumEntityFeature.RETURN_HOME
if activity_wrapper:
self._attr_supported_features |= VacuumEntityFeature.STATE
if locate_wrapper:
self._attr_supported_features |= VacuumEntityFeature.LOCATE
if switch_wrapper:
self._attr_supported_features |= (
VacuumEntityFeature.STOP | VacuumEntityFeature.START
)
if fan_speed_wrapper:
self._attr_fan_speed_list = fan_speed_wrapper.options
@@ -239,27 +159,37 @@ class TuyaVacuumEntity(TuyaEntity, StateVacuumEntity):
@property
def activity(self) -> VacuumActivity | None:
"""Return Tuya vacuum device state."""
return self._read_wrapper(self._activity_wrapper)
if (status := self._read_wrapper(self._status_wrapper)) is not None:
return TUYA_STATUS_TO_HA.get(status)
if self._read_wrapper(self._pause_wrapper):
return VacuumActivity.PAUSED
return None
async def async_start(self, **kwargs: Any) -> None:
"""Start the device."""
await self._async_send_wrapper_updates(self._action_wrapper, "start")
await self._async_send_wrapper_updates(self._switch_wrapper, True)
async def async_stop(self, **kwargs: Any) -> None:
"""Stop the device."""
await self._async_send_wrapper_updates(self._action_wrapper, "stop")
await self._async_send_wrapper_updates(self._switch_wrapper, False)
async def async_pause(self, **kwargs: Any) -> None:
"""Pause the device."""
await self._async_send_wrapper_updates(self._action_wrapper, "pause")
await self.async_stop(**kwargs)
async def async_return_to_base(self, **kwargs: Any) -> None:
"""Return device to dock."""
await self._async_send_wrapper_updates(self._action_wrapper, "return_to_base")
if self._charge_wrapper:
await self._async_send_wrapper_updates(self._charge_wrapper, True)
else:
await self._async_send_wrapper_updates(
self._mode_wrapper, TUYA_MODE_RETURN_HOME
)
async def async_locate(self, **kwargs: Any) -> None:
"""Locate the device."""
await self._async_send_wrapper_updates(self._action_wrapper, "locate")
await self._async_send_wrapper_updates(self._locate_wrapper, True)
async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None:
"""Set fan speed."""

View File

@@ -3938,7 +3938,7 @@
},
"meteo_france": {
"name": "M\u00e9t\u00e9o-France",
"integration_type": "hub",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_polling"
},

2
requirements_all.txt generated
View File

@@ -2575,7 +2575,7 @@ python-rabbitair==0.0.8
python-ripple-api==0.0.3
# homeassistant.components.roborock
python-roborock==3.12.2
python-roborock==3.18.0
# homeassistant.components.smarttub
python-smarttub==0.0.46

View File

@@ -2159,7 +2159,7 @@ python-pooldose==0.8.1
python-rabbitair==0.0.8
# homeassistant.components.roborock
python-roborock==3.12.2
python-roborock==3.18.0
# homeassistant.components.smarttub
python-smarttub==0.0.46

View File

@@ -1176,7 +1176,6 @@ INTEGRATIONS_WITHOUT_SCALE = [
"aten_pe",
"atome",
"august",
"autarco",
"aurora",
"aurora_abb_powerone",
"aussie_broadband",

View File

@@ -148,6 +148,22 @@ def mock_stream_source_fixture() -> Generator[AsyncMock]:
yield mock_stream_source
@pytest.fixture(name="mock_create_stream")
def mock_create_stream_fixture() -> Generator[Mock]:
"""Fixture to mock create_stream and prevent real stream threads."""
mock_stream = Mock()
mock_stream.add_provider = Mock()
mock_stream.start = AsyncMock()
mock_stream.endpoint_url = Mock(return_value="http://home.assistant/playlist.m3u8")
mock_stream.set_update_callback = Mock()
mock_stream.available = True
with patch(
"homeassistant.components.camera.create_stream",
return_value=mock_stream,
):
yield mock_stream
@pytest.fixture
async def mock_test_webrtc_cameras(hass: HomeAssistant) -> None:
"""Initialize test WebRTC cameras with native RTC support."""

View File

@@ -346,20 +346,14 @@ async def test_websocket_stream_no_source(
@pytest.mark.usefixtures("mock_camera", "mock_stream")
async def test_websocket_camera_stream(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, mock_create_stream: Mock
) -> None:
"""Test camera/stream websocket command."""
await async_setup_component(hass, "camera", {})
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
return_value="http://home.assistant/playlist.m3u8",
) as mock_stream_view_url,
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
):
# Request playlist through WebSocket
client = await hass_ws_client(hass)
@@ -369,7 +363,7 @@ async def test_websocket_camera_stream(
msg = await client.receive_json()
# Assert WebSocket response
assert mock_stream_view_url.called
assert mock_create_stream.endpoint_url.called
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
@@ -505,21 +499,18 @@ async def test_play_stream_service_no_source(hass: HomeAssistant) -> None:
@pytest.mark.usefixtures("mock_camera", "mock_stream")
async def test_handle_play_stream_service(hass: HomeAssistant) -> None:
async def test_handle_play_stream_service(
hass: HomeAssistant, mock_create_stream: Mock
) -> None:
"""Test camera play_stream service."""
await async_process_ha_core_config(
hass,
{"external_url": "https://example.com"},
)
await async_setup_component(hass, "media_player", {})
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
) as mock_request_stream,
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
):
# Call service
await hass.services.async_call(
@@ -533,17 +524,14 @@ async def test_handle_play_stream_service(hass: HomeAssistant) -> None:
)
# So long as we request the stream, the rest should be covered
# by the play_media service tests.
assert mock_request_stream.called
assert mock_create_stream.endpoint_url.called
@pytest.mark.usefixtures("mock_stream")
async def test_no_preload_stream(hass: HomeAssistant) -> None:
async def test_no_preload_stream(hass: HomeAssistant, mock_create_stream: Mock) -> None:
"""Test camera preload preference."""
demo_settings = camera.DynamicStreamSettings()
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
) as mock_request_stream,
patch(
"homeassistant.components.camera.prefs.CameraPreferences.get_dynamic_stream_settings",
return_value=demo_settings,
@@ -557,15 +545,14 @@ async def test_no_preload_stream(hass: HomeAssistant) -> None:
await async_setup_component(hass, "camera", {DOMAIN: {"platform": "demo"}})
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert not mock_request_stream.called
assert not mock_create_stream.endpoint_url.called
@pytest.mark.usefixtures("mock_stream")
async def test_preload_stream(hass: HomeAssistant) -> None:
async def test_preload_stream(hass: HomeAssistant, mock_create_stream: Mock) -> None:
"""Test camera preload preference."""
demo_settings = camera.DynamicStreamSettings(preload_stream=True)
with (
patch("homeassistant.components.camera.create_stream") as mock_create_stream,
patch(
"homeassistant.components.camera.prefs.CameraPreferences.get_dynamic_stream_settings",
return_value=demo_settings,
@@ -575,14 +562,13 @@ async def test_preload_stream(hass: HomeAssistant) -> None:
return_value="http://example.com",
),
):
mock_create_stream.return_value.start = AsyncMock()
assert await async_setup_component(
hass, "camera", {DOMAIN: {"platform": "demo"}}
)
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert mock_create_stream.called
assert mock_create_stream.start.called
@pytest.mark.usefixtures("mock_camera")
@@ -694,25 +680,16 @@ async def test_state_streaming(hass: HomeAssistant) -> None:
assert demo_camera.state == camera.CameraState.STREAMING
@pytest.mark.usefixtures("mock_camera", "mock_stream")
@pytest.mark.usefixtures("mock_camera", "mock_stream", "mock_create_stream")
async def test_stream_unavailable(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, mock_create_stream: Mock
) -> None:
"""Camera state."""
await async_setup_component(hass, "camera", {})
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
return_value="http://home.assistant/playlist.m3u8",
),
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
patch(
"homeassistant.components.camera.Stream.set_update_callback",
) as mock_update_callback,
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
):
# Request playlist through WebSocket. We just want to create the stream
# but don't care about the result.
@@ -721,26 +698,22 @@ async def test_stream_unavailable(
{"id": 10, "type": "camera/stream", "entity_id": "camera.demo_camera"}
)
await client.receive_json()
assert mock_update_callback.called
assert mock_create_stream.set_update_callback.called
# Simulate the stream going unavailable
callback = mock_update_callback.call_args.args[0]
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: False
):
callback()
await hass.async_block_till_done()
callback = mock_create_stream.set_update_callback.call_args.args[0]
mock_create_stream.available = False
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == STATE_UNAVAILABLE
# Simulate stream becomes available
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: True
):
callback()
await hass.async_block_till_done()
mock_create_stream.available = True
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None

View File

@@ -0,0 +1,228 @@
"""Test input boolean triggers."""
from collections.abc import Generator
from unittest.mock import patch
import pytest
from homeassistant.components.input_boolean import DOMAIN
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, ServiceCall
from tests.components import (
StateDescription,
arm_trigger,
parametrize_target_entities,
parametrize_trigger_states,
set_or_remove_state,
target_entities,
)
@pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder."""
@pytest.fixture(name="enable_experimental_triggers_conditions")
def enable_experimental_triggers_conditions() -> Generator[None]:
"""Enable experimental triggers and conditions."""
with patch(
"homeassistant.components.labs.async_is_preview_feature_enabled",
return_value=True,
):
yield
@pytest.fixture
async def target_input_booleans(hass: HomeAssistant) -> list[str]:
"""Create multiple input_boolean entities associated with different targets."""
return (await target_entities(hass, DOMAIN))["included"]
@pytest.mark.parametrize(
"trigger_key",
[
"input_boolean.turned_off",
"input_boolean.turned_on",
],
)
async def test_input_boolean_triggers_gated_by_labs_flag(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, trigger_key: str
) -> None:
"""Test the input_boolean triggers are gated by the labs flag."""
await arm_trigger(hass, trigger_key, None, {ATTR_LABEL_ID: "test_label"})
assert (
"Unnamed automation failed to setup triggers and has been disabled: Trigger "
f"'{trigger_key}' requires the experimental 'New triggers and conditions' "
"feature to be enabled in Home Assistant Labs settings (feature flag: "
"'new_triggers_conditions')"
) in caplog.text
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="input_boolean.turned_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
),
*parametrize_trigger_states(
trigger="input_boolean.turned_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
),
],
)
async def test_input_boolean_state_trigger_behavior_any(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_input_booleans: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the input_boolean state trigger fires when any input_boolean state changes to a specific state."""
other_entity_ids = set(target_input_booleans) - {entity_id}
# Set all input_booleans, including the tested one, to the initial state
for eid in target_input_booleans:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Check if changing other input_booleans also triggers
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == (entities_in_target - 1) * state["count"]
service_calls.clear()
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="input_boolean.turned_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
),
*parametrize_trigger_states(
trigger="input_boolean.turned_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
),
],
)
async def test_input_boolean_state_trigger_behavior_first(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_input_booleans: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the input_boolean state trigger fires when the first input_boolean changes to a specific state."""
other_entity_ids = set(target_input_booleans) - {entity_id}
# Set all input_booleans, including the tested one, to the initial state
for eid in target_input_booleans:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "first"}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Triggering other input_booleans should not cause the trigger to fire again
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="input_boolean.turned_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
),
*parametrize_trigger_states(
trigger="input_boolean.turned_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
),
],
)
async def test_input_boolean_state_trigger_behavior_last(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_input_booleans: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the input_boolean state trigger fires when the last input_boolean changes to a specific state."""
other_entity_ids = set(target_input_booleans) - {entity_id}
# Set all input_booleans, including the tested one, to the initial state
for eid in target_input_booleans:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "last"}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()

View File

@@ -158,7 +158,6 @@ async def test_polling_platform_init(
(
SERVICE_SEND_LOCATION,
{
ATTR_MESSAGE: "test_message",
ATTR_MESSAGE_THREAD_ID: "123",
ATTR_LONGITUDE: "1.123",
ATTR_LATITUDE: "1.123",
@@ -414,6 +413,7 @@ async def test_send_chat_action(
CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id,
ATTR_TARGET: [123456],
ATTR_CHAT_ACTION: CHAT_ACTION_TYPING,
ATTR_MESSAGE_THREAD_ID: 123,
},
blocking=True,
return_response=True,
@@ -421,7 +421,9 @@ async def test_send_chat_action(
await hass.async_block_till_done()
mock.assert_called_once()
mock.assert_called_with(chat_id=123456, action=CHAT_ACTION_TYPING)
mock.assert_called_with(
chat_id=123456, action=CHAT_ACTION_TYPING, message_thread_id=123
)
@pytest.mark.parametrize(
@@ -1505,7 +1507,6 @@ async def test_set_message_reaction(
SERVICE_SEND_LOCATION,
{
ATTR_TARGET: 654321,
ATTR_MESSAGE: "test_message",
ATTR_MESSAGE_THREAD_ID: "123",
ATTR_LONGITUDE: "1.123",
ATTR_LATITUDE: "1.123",

View File

@@ -28,7 +28,7 @@
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <VacuumEntityFeature: 8984>,
'supported_features': <VacuumEntityFeature: 13080>,
'translation_key': None,
'unique_id': 'tuya.mwsaod7fa3gjyh6ids',
'unit_of_measurement': None,
@@ -38,7 +38,7 @@
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Hoover',
'supported_features': <VacuumEntityFeature: 8984>,
'supported_features': <VacuumEntityFeature: 13080>,
}),
'context': <ANY>,
'entity_id': 'vacuum.hoover',

View File

@@ -130,8 +130,16 @@ async def test_async_enable_logging(
cleanup_log_files()
@pytest.mark.parametrize(
("extra_env", "log_file_count", "old_log_file_count"),
[({}, 0, 1), ({"HA_DUPLICATE_LOG_FILE": "1"}, 1, 0)],
)
async def test_async_enable_logging_supervisor(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
extra_env: dict[str, str],
log_file_count: int,
old_log_file_count: int,
) -> None:
"""Test to ensure the default log file is not created on Supervisor installations."""
@@ -141,14 +149,14 @@ async def test_async_enable_logging_supervisor(
assert len(glob.glob(ARG_LOG_FILE)) == 0
with (
patch.dict(os.environ, {"SUPERVISOR": "1"}),
patch.dict(os.environ, {"SUPERVISOR": "1", **extra_env}),
patch(
"homeassistant.bootstrap.async_activate_log_queue_handler"
) as mock_async_activate_log_queue_handler,
patch("logging.getLogger"),
):
await bootstrap.async_enable_logging(hass)
assert len(glob.glob(CONFIG_LOG_FILE)) == 0
assert len(glob.glob(CONFIG_LOG_FILE)) == log_file_count
mock_async_activate_log_queue_handler.assert_called_once()
mock_async_activate_log_queue_handler.reset_mock()
@@ -162,9 +170,10 @@ async def test_async_enable_logging_supervisor(
await hass.async_add_executor_job(write_log_file)
assert len(glob.glob(CONFIG_LOG_FILE)) == 1
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == 0
await bootstrap.async_enable_logging(hass)
assert len(glob.glob(CONFIG_LOG_FILE)) == 0
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == 1
assert len(glob.glob(CONFIG_LOG_FILE)) == log_file_count
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == old_log_file_count
mock_async_activate_log_queue_handler.assert_called_once()
mock_async_activate_log_queue_handler.reset_mock()