Compare commits

..

2 Commits

Author SHA1 Message Date
Abílio Costa
937ccc7867 Merge branch 'dev' into scene_triggers 2025-12-16 18:50:53 +00:00
abmantis
385ab8f5d3 Add scene activated trigger 2025-12-16 18:45:56 +00:00
35 changed files with 397 additions and 654 deletions

View File

@@ -624,16 +624,13 @@ async def async_enable_logging(
if log_file is None:
default_log_path = hass.config.path(ERROR_LOG_FILENAME)
if "SUPERVISOR" in os.environ and "HA_DUPLICATE_LOG_FILE" not in os.environ:
if "SUPERVISOR" in os.environ:
_LOGGER.info("Running in Supervisor, not logging to file")
# Rename the default log file if it exists, since previous versions created
# it even on Supervisor
def rename_old_file() -> None:
"""Rename old log file in executor."""
if os.path.isfile(default_log_path):
with contextlib.suppress(OSError):
os.rename(default_log_path, f"{default_log_path}.old")
await hass.async_add_executor_job(rename_old_file)
if os.path.isfile(default_log_path):
with contextlib.suppress(OSError):
os.rename(default_log_path, f"{default_log_path}.old")
err_log_path = None
else:
err_log_path = default_log_path

View File

@@ -6,6 +6,5 @@
"documentation": "https://www.home-assistant.io/integrations/autarco",
"integration_type": "hub",
"iot_class": "cloud_polling",
"quality_scale": "silver",
"requirements": ["autarco==3.2.0"]
}

View File

@@ -132,8 +132,8 @@ _EXPERIMENTAL_TRIGGER_PLATFORMS = {
"fan",
"lawn_mower",
"light",
"lock",
"media_player",
"scene",
"switch",
"text",
"update",

View File

@@ -108,7 +108,7 @@ class IcloudAccount:
if self.api.requires_2fa:
# Trigger a new log in to ensure the user enters the 2FA code again.
raise PyiCloudFailedLoginException("2FA Required") # noqa: TRY301
raise PyiCloudFailedLoginException # noqa: TRY301
except PyiCloudFailedLoginException:
self.api = None

View File

@@ -261,8 +261,7 @@ class ImprovBLEConfigFlow(ConfigFlow, domain=DOMAIN):
if self._can_identify is None:
try:
await self._try_call(device.ensure_connected())
self._can_identify = device.can_identify
self._can_identify = await self._try_call(device.can_identify())
except AbortFlow as err:
return self.async_abort(reason=err.reason)
if self._can_identify:

View File

@@ -13,5 +13,5 @@
"documentation": "https://www.home-assistant.io/integrations/improv_ble",
"integration_type": "device",
"iot_class": "local_polling",
"requirements": ["py-improv-ble-client==2.0.1"]
"requirements": ["py-improv-ble-client==1.0.3"]
}

View File

@@ -22,19 +22,5 @@
"unlock": {
"service": "mdi:lock-open-variant"
}
},
"triggers": {
"jammed": {
"trigger": "mdi:lock-alert"
},
"locked": {
"trigger": "mdi:lock"
},
"opened": {
"trigger": "mdi:lock-open-variant"
},
"unlocked": {
"trigger": "mdi:lock-open-variant"
}
}
}

View File

@@ -1,8 +1,4 @@
{
"common": {
"trigger_behavior_description": "The behavior of the targeted locks to trigger on.",
"trigger_behavior_name": "Behavior"
},
"device_automation": {
"action_type": {
"lock": "Lock {entity_name}",
@@ -54,15 +50,6 @@
"message": "The code for {entity_id} doesn't match pattern {code_format}."
}
},
"selector": {
"trigger_behavior": {
"options": {
"any": "Any",
"first": "First",
"last": "Last"
}
}
},
"services": {
"lock": {
"description": "Locks a lock.",
@@ -95,47 +82,5 @@
"name": "Unlock"
}
},
"title": "Lock",
"triggers": {
"jammed": {
"description": "Triggers after one or more locks jam.",
"fields": {
"behavior": {
"description": "[%key:component::lock::common::trigger_behavior_description%]",
"name": "[%key:component::lock::common::trigger_behavior_name%]"
}
},
"name": "Lock jammed"
},
"locked": {
"description": "Triggers after one or more locks lock.",
"fields": {
"behavior": {
"description": "[%key:component::lock::common::trigger_behavior_description%]",
"name": "[%key:component::lock::common::trigger_behavior_name%]"
}
},
"name": "Lock locked"
},
"opened": {
"description": "Triggers after one or more locks open.",
"fields": {
"behavior": {
"description": "[%key:component::lock::common::trigger_behavior_description%]",
"name": "[%key:component::lock::common::trigger_behavior_name%]"
}
},
"name": "Lock opened"
},
"unlocked": {
"description": "Triggers after one or more locks unlock.",
"fields": {
"behavior": {
"description": "[%key:component::lock::common::trigger_behavior_description%]",
"name": "[%key:component::lock::common::trigger_behavior_name%]"
}
},
"name": "Lock unlocked"
}
}
"title": "Lock"
}

View File

@@ -1,18 +0,0 @@
"""Provides triggers for locks."""
from homeassistant.core import HomeAssistant
from homeassistant.helpers.trigger import Trigger, make_entity_target_state_trigger
from .const import DOMAIN, LockState
TRIGGERS: dict[str, type[Trigger]] = {
"jammed": make_entity_target_state_trigger(DOMAIN, LockState.JAMMED),
"locked": make_entity_target_state_trigger(DOMAIN, LockState.LOCKED),
"opened": make_entity_target_state_trigger(DOMAIN, LockState.OPEN),
"unlocked": make_entity_target_state_trigger(DOMAIN, LockState.UNLOCKED),
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for locks."""
return TRIGGERS

View File

@@ -1,20 +0,0 @@
.trigger_common: &trigger_common
target:
entity:
domain: lock
fields:
behavior:
required: true
default: any
selector:
select:
options:
- first
- last
- any
translation_key: trigger_behavior
jammed: *trigger_common
locked: *trigger_common
opened: *trigger_common
unlocked: *trigger_common

View File

@@ -1,10 +1,9 @@
{
"domain": "meteo_france",
"name": "Météo-France",
"name": "M\u00e9t\u00e9o-France",
"codeowners": ["@hacf-fr", "@oncleben31", "@Quentame"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/meteo_france",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["meteofrance_api"],
"requirements": ["meteofrance-api==1.4.0"]

View File

@@ -62,7 +62,7 @@ SENSOR_TYPES: tuple[MeteoFranceSensorEntityDescription, ...] = (
key="pressure",
name="Pressure",
native_unit_of_measurement=UnitOfPressure.HPA,
device_class=SensorDeviceClass.ATMOSPHERIC_PRESSURE,
device_class=SensorDeviceClass.PRESSURE,
state_class=SensorStateClass.MEASUREMENT,
entity_registry_enabled_default=False,
data_path="current_forecast:sea_level",

View File

@@ -7,5 +7,5 @@
"documentation": "https://www.home-assistant.io/integrations/mill",
"iot_class": "local_polling",
"loggers": ["mill", "mill_local"],
"requirements": ["millheater==0.14.1", "mill-local==0.5.0"]
"requirements": ["millheater==0.14.1", "mill-local==0.3.0"]
}

View File

@@ -5,18 +5,14 @@ from __future__ import annotations
from datetime import timedelta
import logging
from pynintendoauth.exceptions import (
HttpException,
InvalidOAuthConfigurationException,
InvalidSessionTokenException,
)
from pynintendoauth.exceptions import InvalidOAuthConfigurationException
from pynintendoparental import Authenticator, NintendoParental
from pynintendoparental.exceptions import NoDevicesFoundException
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import DOMAIN
@@ -62,13 +58,3 @@ class NintendoUpdateCoordinator(DataUpdateCoordinator[None]):
translation_domain=DOMAIN,
translation_key="no_devices_found",
) from err
except InvalidSessionTokenException as err:
_LOGGER.debug("Session token invalid, will renew on next update")
raise UpdateFailed from err
except HttpException as err:
if err.error_code == "update_required":
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="update_required",
) from err
raise UpdateFailed(retry_after=900) from err

View File

@@ -83,9 +83,6 @@
},
"no_devices_found": {
"message": "No Nintendo devices found for this account."
},
"update_required": {
"message": "The Nintendo Switch parental controls integration requires an update due to changes in Nintendo's API."
}
},
"services": {

View File

@@ -20,7 +20,7 @@
"loggers": ["roborock"],
"quality_scale": "silver",
"requirements": [
"python-roborock==3.18.0",
"python-roborock==3.12.2",
"vacuum-map-parser-roborock==0.1.4"
]
}

View File

@@ -20,5 +20,10 @@
"turn_on": {
"service": "mdi:power"
}
},
"triggers": {
"activated": {
"trigger": "mdi:palette"
}
}
}

View File

@@ -59,5 +59,11 @@
"name": "Activate"
}
},
"title": "Scene"
"title": "Scene",
"triggers": {
"activated": {
"description": "Triggers when a scene was activated",
"name": "Scene activated"
}
}
}

View File

@@ -0,0 +1,42 @@
"""Provides triggers for scenes."""
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA,
EntityTriggerBase,
Trigger,
)
from . import DOMAIN
class SceneActivatedTrigger(EntityTriggerBase):
"""Trigger for scene entity activations."""
_domain = DOMAIN
_schema = ENTITY_STATE_TRIGGER_SCHEMA
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check if the origin state is valid and different from the current state."""
# UNKNOWN is a valid from_state, otherwise the first time the scene is activated
# it would not trigger
if from_state.state == STATE_UNAVAILABLE:
return False
return from_state.state != to_state.state
def is_valid_state(self, state: State) -> bool:
"""Check if the new state is not invalid."""
return state.state not in (STATE_UNAVAILABLE, STATE_UNKNOWN)
TRIGGERS: dict[str, type[Trigger]] = {
"activated": SceneActivatedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for scenes."""
return TRIGGERS

View File

@@ -0,0 +1,4 @@
activated:
target:
entity:
domain: scene

View File

@@ -63,7 +63,6 @@ from .const import (
ATTR_PASSWORD,
ATTR_QUESTION,
ATTR_REACTION,
ATTR_REPLY_TO_MSGID,
ATTR_RESIZE_KEYBOARD,
ATTR_SHOW_ALERT,
ATTR_STICKER_ID,
@@ -127,26 +126,21 @@ BASE_SERVICE_SCHEMA = vol.Schema(
vol.Optional(ATTR_TIMEOUT): cv.positive_int,
vol.Optional(ATTR_MESSAGE_TAG): cv.string,
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
}
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_SCHEMA_SEND_MESSAGE = vol.All(
cv.deprecated(ATTR_TIMEOUT),
BASE_SERVICE_SCHEMA.extend(
{
vol.Required(ATTR_MESSAGE): cv.string,
vol.Optional(ATTR_TITLE): cv.string,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
{vol.Required(ATTR_MESSAGE): cv.string, vol.Optional(ATTR_TITLE): cv.string}
),
)
SERVICE_SCHEMA_SEND_CHAT_ACTION = vol.All(
cv.deprecated(ATTR_TIMEOUT),
vol.Schema(
BASE_SERVICE_SCHEMA.extend(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [vol.Coerce(int)]),
vol.Required(ATTR_CHAT_ACTION): vol.In(
(
CHAT_ACTION_TYPING,
@@ -162,7 +156,6 @@ SERVICE_SCHEMA_SEND_CHAT_ACTION = vol.All(
CHAT_ACTION_UPLOAD_VIDEO_NOTE,
)
),
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
}
),
)
@@ -176,7 +169,6 @@ SERVICE_SCHEMA_BASE_SEND_FILE = BASE_SERVICE_SCHEMA.extend(
vol.Optional(ATTR_PASSWORD): cv.string,
vol.Optional(ATTR_AUTHENTICATION): cv.string,
vol.Optional(ATTR_VERIFY_SSL): cv.boolean,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
)
@@ -196,7 +188,6 @@ SERVICE_SCHEMA_SEND_LOCATION = vol.All(
{
vol.Required(ATTR_LONGITUDE): cv.string,
vol.Required(ATTR_LATITUDE): cv.string,
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
),
)
@@ -214,25 +205,18 @@ SERVICE_SCHEMA_SEND_POLL = vol.All(
vol.Optional(ATTR_ALLOWS_MULTIPLE_ANSWERS, default=False): cv.boolean,
vol.Optional(ATTR_DISABLE_NOTIF): cv.boolean,
vol.Optional(ATTR_MESSAGE_THREAD_ID): vol.Coerce(int),
vol.Optional(ATTR_REPLY_TO_MSGID): vol.Coerce(int),
}
),
)
SERVICE_SCHEMA_EDIT_MESSAGE = vol.All(
cv.deprecated(ATTR_TIMEOUT),
vol.Schema(
SERVICE_SCHEMA_BASE_SEND_FILE.extend(
{
vol.Optional(CONF_CONFIG_ENTRY_ID): cv.string,
vol.Optional(ATTR_TITLE): cv.string,
vol.Required(ATTR_MESSAGE): cv.string,
vol.Required(ATTR_MESSAGEID): vol.Any(
cv.positive_int, vol.All(cv.string, "last")
),
vol.Required(ATTR_CHAT_ID): vol.Coerce(int),
vol.Optional(ATTR_PARSER): cv.string,
vol.Optional(ATTR_KEYBOARD_INLINE): cv.ensure_list,
vol.Optional(ATTR_DISABLE_WEB_PREV): cv.boolean,
}
),
)

View File

@@ -783,7 +783,6 @@ class TelegramNotificationService:
None,
chat_id=chat_id,
action=chat_action,
message_thread_id=kwargs.get(ATTR_MESSAGE_THREAD_ID),
context=context,
)
result[chat_id] = is_successful

View File

@@ -3938,7 +3938,7 @@
},
"meteo_france": {
"name": "M\u00e9t\u00e9o-France",
"integration_type": "service",
"integration_type": "hub",
"config_flow": true,
"iot_class": "cloud_polling"
},

6
requirements_all.txt generated
View File

@@ -1482,7 +1482,7 @@ micloud==0.5
microBeesPy==0.3.5
# homeassistant.components.mill
mill-local==0.5.0
mill-local==0.3.0
# homeassistant.components.mill
millheater==0.14.1
@@ -1810,7 +1810,7 @@ py-dactyl==2.0.4
py-dormakaba-dkey==1.0.6
# homeassistant.components.improv_ble
py-improv-ble-client==2.0.1
py-improv-ble-client==1.0.3
# homeassistant.components.madvr
py-madvr2==1.6.40
@@ -2575,7 +2575,7 @@ python-rabbitair==0.0.8
python-ripple-api==0.0.3
# homeassistant.components.roborock
python-roborock==3.18.0
python-roborock==3.12.2
# homeassistant.components.smarttub
python-smarttub==0.0.46

View File

@@ -1289,7 +1289,7 @@ micloud==0.5
microBeesPy==0.3.5
# homeassistant.components.mill
mill-local==0.5.0
mill-local==0.3.0
# homeassistant.components.mill
millheater==0.14.1
@@ -1550,7 +1550,7 @@ py-dactyl==2.0.4
py-dormakaba-dkey==1.0.6
# homeassistant.components.improv_ble
py-improv-ble-client==2.0.1
py-improv-ble-client==1.0.3
# homeassistant.components.madvr
py-madvr2==1.6.40
@@ -2159,7 +2159,7 @@ python-pooldose==0.8.1
python-rabbitair==0.0.8
# homeassistant.components.roborock
python-roborock==3.18.0
python-roborock==3.12.2
# homeassistant.components.smarttub
python-smarttub==0.0.46

View File

@@ -1176,6 +1176,7 @@ INTEGRATIONS_WITHOUT_SCALE = [
"aten_pe",
"atome",
"august",
"autarco",
"aurora",
"aurora_abb_powerone",
"aussie_broadband",

View File

@@ -148,22 +148,6 @@ def mock_stream_source_fixture() -> Generator[AsyncMock]:
yield mock_stream_source
@pytest.fixture(name="mock_create_stream")
def mock_create_stream_fixture() -> Generator[Mock]:
"""Fixture to mock create_stream and prevent real stream threads."""
mock_stream = Mock()
mock_stream.add_provider = Mock()
mock_stream.start = AsyncMock()
mock_stream.endpoint_url = Mock(return_value="http://home.assistant/playlist.m3u8")
mock_stream.set_update_callback = Mock()
mock_stream.available = True
with patch(
"homeassistant.components.camera.create_stream",
return_value=mock_stream,
):
yield mock_stream
@pytest.fixture
async def mock_test_webrtc_cameras(hass: HomeAssistant) -> None:
"""Initialize test WebRTC cameras with native RTC support."""

View File

@@ -346,14 +346,20 @@ async def test_websocket_stream_no_source(
@pytest.mark.usefixtures("mock_camera", "mock_stream")
async def test_websocket_camera_stream(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, mock_create_stream: Mock
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test camera/stream websocket command."""
await async_setup_component(hass, "camera", {})
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
return_value="http://home.assistant/playlist.m3u8",
) as mock_stream_view_url,
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
):
# Request playlist through WebSocket
client = await hass_ws_client(hass)
@@ -363,7 +369,7 @@ async def test_websocket_camera_stream(
msg = await client.receive_json()
# Assert WebSocket response
assert mock_create_stream.endpoint_url.called
assert mock_stream_view_url.called
assert msg["id"] == 6
assert msg["type"] == TYPE_RESULT
assert msg["success"]
@@ -499,18 +505,21 @@ async def test_play_stream_service_no_source(hass: HomeAssistant) -> None:
@pytest.mark.usefixtures("mock_camera", "mock_stream")
async def test_handle_play_stream_service(
hass: HomeAssistant, mock_create_stream: Mock
) -> None:
async def test_handle_play_stream_service(hass: HomeAssistant) -> None:
"""Test camera play_stream service."""
await async_process_ha_core_config(
hass,
{"external_url": "https://example.com"},
)
await async_setup_component(hass, "media_player", {})
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
) as mock_request_stream,
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
):
# Call service
await hass.services.async_call(
@@ -524,14 +533,17 @@ async def test_handle_play_stream_service(
)
# So long as we request the stream, the rest should be covered
# by the play_media service tests.
assert mock_create_stream.endpoint_url.called
assert mock_request_stream.called
@pytest.mark.usefixtures("mock_stream")
async def test_no_preload_stream(hass: HomeAssistant, mock_create_stream: Mock) -> None:
async def test_no_preload_stream(hass: HomeAssistant) -> None:
"""Test camera preload preference."""
demo_settings = camera.DynamicStreamSettings()
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
) as mock_request_stream,
patch(
"homeassistant.components.camera.prefs.CameraPreferences.get_dynamic_stream_settings",
return_value=demo_settings,
@@ -545,14 +557,15 @@ async def test_no_preload_stream(hass: HomeAssistant, mock_create_stream: Mock)
await async_setup_component(hass, "camera", {DOMAIN: {"platform": "demo"}})
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert not mock_create_stream.endpoint_url.called
assert not mock_request_stream.called
@pytest.mark.usefixtures("mock_stream")
async def test_preload_stream(hass: HomeAssistant, mock_create_stream: Mock) -> None:
async def test_preload_stream(hass: HomeAssistant) -> None:
"""Test camera preload preference."""
demo_settings = camera.DynamicStreamSettings(preload_stream=True)
with (
patch("homeassistant.components.camera.create_stream") as mock_create_stream,
patch(
"homeassistant.components.camera.prefs.CameraPreferences.get_dynamic_stream_settings",
return_value=demo_settings,
@@ -562,13 +575,14 @@ async def test_preload_stream(hass: HomeAssistant, mock_create_stream: Mock) ->
return_value="http://example.com",
),
):
mock_create_stream.return_value.start = AsyncMock()
assert await async_setup_component(
hass, "camera", {DOMAIN: {"platform": "demo"}}
)
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert mock_create_stream.start.called
assert mock_create_stream.called
@pytest.mark.usefixtures("mock_camera")
@@ -680,16 +694,25 @@ async def test_state_streaming(hass: HomeAssistant) -> None:
assert demo_camera.state == camera.CameraState.STREAMING
@pytest.mark.usefixtures("mock_camera", "mock_stream", "mock_create_stream")
@pytest.mark.usefixtures("mock_camera", "mock_stream")
async def test_stream_unavailable(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, mock_create_stream: Mock
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Camera state."""
await async_setup_component(hass, "camera", {})
with patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
with (
patch(
"homeassistant.components.camera.Stream.endpoint_url",
return_value="http://home.assistant/playlist.m3u8",
),
patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
),
patch(
"homeassistant.components.camera.Stream.set_update_callback",
) as mock_update_callback,
):
# Request playlist through WebSocket. We just want to create the stream
# but don't care about the result.
@@ -698,22 +721,26 @@ async def test_stream_unavailable(
{"id": 10, "type": "camera/stream", "entity_id": "camera.demo_camera"}
)
await client.receive_json()
assert mock_create_stream.set_update_callback.called
assert mock_update_callback.called
# Simulate the stream going unavailable
callback = mock_create_stream.set_update_callback.call_args.args[0]
mock_create_stream.available = False
callback()
await hass.async_block_till_done()
callback = mock_update_callback.call_args.args[0]
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: False
):
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == STATE_UNAVAILABLE
# Simulate stream becomes available
mock_create_stream.available = True
callback()
await hass.async_block_till_done()
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: True
):
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None

View File

@@ -2,7 +2,7 @@
import asyncio
from collections.abc import Callable
from unittest.mock import PropertyMock, patch
from unittest.mock import patch
from bleak.exc import BleakError
from improv_ble_client import (
@@ -294,13 +294,8 @@ async def test_bluetooth_rediscovery_after_successful_provision(
assert result["step_id"] == "bluetooth_confirm"
# Start provisioning
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -380,13 +375,8 @@ async def _test_common_success_with_identify(
hass: HomeAssistant, result: FlowResult, address: str
) -> None:
"""Test bluetooth and user flow success paths."""
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=True,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=True
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -430,13 +420,8 @@ async def _test_common_success_wo_identify(
placeholders: dict[str, str] | None = None,
) -> None:
"""Test bluetooth and user flow success paths."""
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -490,13 +475,8 @@ async def _test_common_success_wo_identify_w_authorize(
hass: HomeAssistant, result: FlowResult, address: str
) -> None:
"""Test bluetooth and user flow success paths."""
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -591,7 +571,7 @@ async def test_bluetooth_step_already_in_progress(hass: HomeAssistant) -> None:
(improv_ble_errors.CharacteristicMissingError, "characteristic_missing"),
],
)
async def test_ensure_connected_fails(hass: HomeAssistant, exc, error) -> None:
async def test_can_identify_fails(hass: HomeAssistant, exc, error) -> None:
"""Test bluetooth flow with error."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
@@ -608,8 +588,7 @@ async def test_ensure_connected_fails(hass: HomeAssistant, exc, error) -> None:
assert result["errors"] is None
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected",
side_effect=exc,
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", side_effect=exc
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -643,13 +622,8 @@ async def test_identify_fails(hass: HomeAssistant, exc, error) -> None:
assert result["step_id"] == "bluetooth_confirm"
assert result["errors"] is None
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=True,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=True
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -691,13 +665,8 @@ async def test_need_authorization_fails(hass: HomeAssistant, exc, error) -> None
assert result["step_id"] == "bluetooth_confirm"
assert result["errors"] is None
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -740,13 +709,8 @@ async def test_authorize_fails(hass: HomeAssistant, exc, error) -> None:
assert result["step_id"] == "bluetooth_confirm"
assert result["errors"] is None
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -788,13 +752,8 @@ async def _test_provision_error(hass: HomeAssistant, exc) -> str:
assert result["step_id"] == "bluetooth_confirm"
assert result["errors"] is None
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -919,13 +878,8 @@ async def test_flow_chaining_with_next_flow(hass: HomeAssistant) -> None:
assert result["step_id"] == "bluetooth_confirm"
# Start provisioning
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -992,13 +946,8 @@ async def test_flow_chaining_timeout(hass: HomeAssistant) -> None:
assert result["step_id"] == "bluetooth_confirm"
# Start provisioning
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -1049,13 +998,8 @@ async def test_flow_chaining_with_redirect_url(hass: HomeAssistant) -> None:
assert result["step_id"] == "bluetooth_confirm"
# Start provisioning
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@@ -1125,13 +1069,8 @@ async def test_flow_chaining_future_already_done(
assert result["step_id"] == "bluetooth_confirm"
# Start provisioning
with (
patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify",
return_value=False,
new_callable=PropertyMock,
),
patch(f"{IMPROV_BLE}.config_flow.ImprovBLEClient.ensure_connected"),
with patch(
f"{IMPROV_BLE}.config_flow.ImprovBLEClient.can_identify", return_value=False
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],

View File

@@ -1,261 +0,0 @@
"""Test lock triggers."""
from collections.abc import Generator
from unittest.mock import patch
import pytest
from homeassistant.components.lock import DOMAIN, LockState
from homeassistant.const import ATTR_LABEL_ID, CONF_ENTITY_ID
from homeassistant.core import HomeAssistant, ServiceCall
from tests.components import (
StateDescription,
arm_trigger,
other_states,
parametrize_target_entities,
parametrize_trigger_states,
set_or_remove_state,
target_entities,
)
@pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder."""
@pytest.fixture(name="enable_experimental_triggers_conditions")
def enable_experimental_triggers_conditions() -> Generator[None]:
"""Enable experimental triggers and conditions."""
with patch(
"homeassistant.components.labs.async_is_preview_feature_enabled",
return_value=True,
):
yield
@pytest.fixture
async def target_locks(hass: HomeAssistant) -> list[str]:
"""Create multiple lock entities associated with different targets."""
return (await target_entities(hass, DOMAIN))["included"]
@pytest.mark.parametrize(
"trigger_key",
[
"lock.jammed",
"lock.locked",
"lock.opened",
"lock.unlocked",
],
)
async def test_lock_triggers_gated_by_labs_flag(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, trigger_key: str
) -> None:
"""Test the lock triggers are gated by the labs flag."""
await arm_trigger(hass, trigger_key, None, {ATTR_LABEL_ID: "test_label"})
assert (
"Unnamed automation failed to setup triggers and has been disabled: Trigger "
f"'{trigger_key}' requires the experimental 'New triggers and conditions' "
"feature to be enabled in Home Assistant Labs settings (feature flag: "
"'new_triggers_conditions')"
) in caplog.text
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="lock.jammed",
target_states=[LockState.JAMMED],
other_states=other_states(LockState.JAMMED),
),
*parametrize_trigger_states(
trigger="lock.locked",
target_states=[LockState.LOCKED],
other_states=other_states(LockState.LOCKED),
),
*parametrize_trigger_states(
trigger="lock.opened",
target_states=[LockState.OPEN],
other_states=other_states(LockState.OPEN),
),
*parametrize_trigger_states(
trigger="lock.unlocked",
target_states=[LockState.UNLOCKED],
other_states=other_states(LockState.UNLOCKED),
),
],
)
async def test_lock_state_trigger_behavior_any(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_locks: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the lock state trigger fires when any lock state changes to a specific state."""
other_entity_ids = set(target_locks) - {entity_id}
# Set all locks, including the tested one, to the initial state
for eid in target_locks:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Check if changing other locks also triggers
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == (entities_in_target - 1) * state["count"]
service_calls.clear()
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="lock.jammed",
target_states=[LockState.JAMMED],
other_states=other_states(LockState.JAMMED),
),
*parametrize_trigger_states(
trigger="lock.locked",
target_states=[LockState.LOCKED],
other_states=other_states(LockState.LOCKED),
),
*parametrize_trigger_states(
trigger="lock.opened",
target_states=[LockState.OPEN],
other_states=other_states(LockState.OPEN),
),
*parametrize_trigger_states(
trigger="lock.unlocked",
target_states=[LockState.UNLOCKED],
other_states=other_states(LockState.UNLOCKED),
),
],
)
async def test_lock_state_trigger_behavior_first(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_locks: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the lock state trigger fires when the first lock changes to a specific state."""
other_entity_ids = set(target_locks) - {entity_id}
# Set all locks, including the tested one, to the initial state
for eid in target_locks:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "first"}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Triggering other locks should not cause the trigger to fire again
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities(DOMAIN),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
*parametrize_trigger_states(
trigger="lock.jammed",
target_states=[LockState.JAMMED],
other_states=other_states(LockState.JAMMED),
),
*parametrize_trigger_states(
trigger="lock.locked",
target_states=[LockState.LOCKED],
other_states=other_states(LockState.LOCKED),
),
*parametrize_trigger_states(
trigger="lock.opened",
target_states=[LockState.OPEN],
other_states=other_states(LockState.OPEN),
),
*parametrize_trigger_states(
trigger="lock.unlocked",
target_states=[LockState.UNLOCKED],
other_states=other_states(LockState.UNLOCKED),
),
],
)
async def test_lock_state_trigger_behavior_last(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_locks: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the lock state trigger fires when the last lock changes to a specific state."""
other_entity_ids = set(target_locks) - {entity_id}
# Set all locks, including the tested one, to the initial state
for eid in target_locks:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, {"behavior": "last"}, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == 0
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()

View File

@@ -391,7 +391,7 @@
'suggested_display_precision': 2,
}),
}),
'original_device_class': <SensorDeviceClass.ATMOSPHERIC_PRESSURE: 'atmospheric_pressure'>,
'original_device_class': <SensorDeviceClass.PRESSURE: 'pressure'>,
'original_icon': None,
'original_name': 'La Clusaz Pressure',
'platform': 'meteo_france',
@@ -407,7 +407,7 @@
StateSnapshot({
'attributes': ReadOnlyDict({
'attribution': 'Data provided by Météo-France',
'device_class': 'atmospheric_pressure',
'device_class': 'pressure',
'friendly_name': 'La Clusaz Pressure',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': <UnitOfPressure.HPA: 'hPa'>,

View File

@@ -2,13 +2,8 @@
from unittest.mock import AsyncMock
from pynintendoauth.exceptions import (
HttpException,
InvalidOAuthConfigurationException,
InvalidSessionTokenException,
)
from pynintendoauth.exceptions import InvalidOAuthConfigurationException
from pynintendoparental.exceptions import NoDevicesFoundException
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
@@ -19,62 +14,16 @@ from . import setup_integration
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
("exception", "translation_key", "expected_state", "expected_log_message"),
[
(
InvalidOAuthConfigurationException(
status_code=401, message="Authentication failed"
),
"invalid_auth",
ConfigEntryState.SETUP_ERROR,
None,
),
(
NoDevicesFoundException(),
"no_devices_found",
ConfigEntryState.SETUP_ERROR,
None,
),
(
HttpException(
status_code=400, error_code="update_required", message="Update required"
),
"update_required",
ConfigEntryState.SETUP_ERROR,
None,
),
(
HttpException(
status_code=500, error_code="unknown", message="Unknown error"
),
None,
ConfigEntryState.SETUP_RETRY,
None,
),
(
InvalidSessionTokenException(
status_code=403, error_code="invalid_token", message="Invalid token"
),
None,
ConfigEntryState.SETUP_RETRY,
"Session token invalid, will renew on next update",
),
],
)
async def test_update_errors(
async def test_invalid_authentication(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_nintendo_client: AsyncMock,
entity_registry: er.EntityRegistry,
caplog: pytest.LogCaptureFixture,
exception: Exception,
translation_key: str,
expected_state: ConfigEntryState,
expected_log_message: str | None,
) -> None:
"""Test handling of update errors."""
mock_nintendo_client.update.side_effect = exception
"""Test handling of invalid authentication."""
mock_nintendo_client.update.side_effect = InvalidOAuthConfigurationException(
status_code=401, message="Authentication failed"
)
await setup_integration(hass, mock_config_entry)
@@ -83,13 +32,25 @@ async def test_update_errors(
entity_registry, mock_config_entry.entry_id
)
assert len(entries) == 0
# Ensure the config entry is marked as error
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
# Ensure the config entry is marked as expected state
assert mock_config_entry.state is expected_state
# Ensure the correct translation key is used in the error
assert mock_config_entry.error_reason_translation_key == translation_key
async def test_no_devices(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_nintendo_client: AsyncMock,
entity_registry: er.EntityRegistry,
) -> None:
"""Test handling of invalid authentication."""
mock_nintendo_client.update.side_effect = NoDevicesFoundException()
# If there's an expected log message, check that it was logged
if expected_log_message:
assert expected_log_message in caplog.text
await setup_integration(hass, mock_config_entry)
# Ensure no entities are created
entries = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
assert len(entries) == 0
# Ensure the config entry is marked as error
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR

View File

@@ -0,0 +1,192 @@
"""Test scene trigger."""
from collections.abc import Generator
from unittest.mock import patch
import pytest
from homeassistant.const import (
ATTR_LABEL_ID,
CONF_ENTITY_ID,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, ServiceCall
from tests.components import (
StateDescription,
arm_trigger,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder."""
@pytest.fixture(name="enable_experimental_triggers_conditions")
def enable_experimental_triggers_conditions() -> Generator[None]:
"""Enable experimental triggers and conditions."""
with patch(
"homeassistant.components.labs.async_is_preview_feature_enabled",
return_value=True,
):
yield
@pytest.fixture
async def target_scenes(hass: HomeAssistant) -> list[str]:
"""Create multiple scene entities associated with different targets."""
return (await target_entities(hass, "scene"))["included"]
@pytest.mark.parametrize("trigger_key", ["scene.activated"])
async def test_scene_triggers_gated_by_labs_flag(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, trigger_key: str
) -> None:
"""Test the scene triggers are gated by the labs flag."""
await arm_trigger(hass, trigger_key, None, {ATTR_LABEL_ID: "test_label"})
assert (
"Unnamed automation failed to setup triggers and has been disabled: Trigger "
f"'{trigger_key}' requires the experimental 'New triggers and conditions' "
"feature to be enabled in Home Assistant Labs settings (feature flag: "
"'new_triggers_conditions')"
) in caplog.text
@pytest.mark.usefixtures("enable_experimental_triggers_conditions")
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("scene"),
)
@pytest.mark.parametrize(
("trigger", "states"),
[
(
"scene.activated",
[
{"included": {"state": None, "attributes": {}}, "count": 0},
{
"included": {
"state": "2021-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 0,
},
{
"included": {
"state": "2022-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
],
),
(
"scene.activated",
[
{"included": {"state": "foo", "attributes": {}}, "count": 0},
{
"included": {
"state": "2021-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
{
"included": {
"state": "2022-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
],
),
(
"scene.activated",
[
{
"included": {"state": STATE_UNAVAILABLE, "attributes": {}},
"count": 0,
},
{
"included": {
"state": "2021-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 0,
},
{
"included": {
"state": "2022-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
{
"included": {"state": STATE_UNAVAILABLE, "attributes": {}},
"count": 0,
},
],
),
(
"scene.activated",
[
{"included": {"state": STATE_UNKNOWN, "attributes": {}}, "count": 0},
{
"included": {
"state": "2021-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
{
"included": {
"state": "2022-01-01T23:59:59+00:00",
"attributes": {},
},
"count": 1,
},
{"included": {"state": STATE_UNKNOWN, "attributes": {}}, "count": 0},
],
),
],
)
async def test_scene_state_trigger_behavior_any(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_scenes: list[str],
trigger_target_config: dict,
entity_id: str,
entities_in_target: int,
trigger: str,
states: list[StateDescription],
) -> None:
"""Test that the scene state trigger fires when any scene state changes to a specific state."""
other_entity_ids = set(target_scenes) - {entity_id}
# Set all scenes, including the tested scene, to the initial state
for eid in target_scenes:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
await arm_trigger(hass, trigger, None, trigger_target_config)
for state in states[1:]:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == state["count"]
for service_call in service_calls:
assert service_call.data[CONF_ENTITY_ID] == entity_id
service_calls.clear()
# Check if changing other scenes also triggers
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert len(service_calls) == (entities_in_target - 1) * state["count"]
service_calls.clear()

View File

@@ -158,6 +158,7 @@ async def test_polling_platform_init(
(
SERVICE_SEND_LOCATION,
{
ATTR_MESSAGE: "test_message",
ATTR_MESSAGE_THREAD_ID: "123",
ATTR_LONGITUDE: "1.123",
ATTR_LATITUDE: "1.123",
@@ -413,7 +414,6 @@ async def test_send_chat_action(
CONF_CONFIG_ENTRY_ID: mock_broadcast_config_entry.entry_id,
ATTR_TARGET: [123456],
ATTR_CHAT_ACTION: CHAT_ACTION_TYPING,
ATTR_MESSAGE_THREAD_ID: 123,
},
blocking=True,
return_response=True,
@@ -421,9 +421,7 @@ async def test_send_chat_action(
await hass.async_block_till_done()
mock.assert_called_once()
mock.assert_called_with(
chat_id=123456, action=CHAT_ACTION_TYPING, message_thread_id=123
)
mock.assert_called_with(chat_id=123456, action=CHAT_ACTION_TYPING)
@pytest.mark.parametrize(
@@ -1507,6 +1505,7 @@ async def test_set_message_reaction(
SERVICE_SEND_LOCATION,
{
ATTR_TARGET: 654321,
ATTR_MESSAGE: "test_message",
ATTR_MESSAGE_THREAD_ID: "123",
ATTR_LONGITUDE: "1.123",
ATTR_LATITUDE: "1.123",

View File

@@ -130,16 +130,8 @@ async def test_async_enable_logging(
cleanup_log_files()
@pytest.mark.parametrize(
("extra_env", "log_file_count", "old_log_file_count"),
[({}, 0, 1), ({"HA_DUPLICATE_LOG_FILE": "1"}, 1, 0)],
)
async def test_async_enable_logging_supervisor(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
extra_env: dict[str, str],
log_file_count: int,
old_log_file_count: int,
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test to ensure the default log file is not created on Supervisor installations."""
@@ -149,14 +141,14 @@ async def test_async_enable_logging_supervisor(
assert len(glob.glob(ARG_LOG_FILE)) == 0
with (
patch.dict(os.environ, {"SUPERVISOR": "1", **extra_env}),
patch.dict(os.environ, {"SUPERVISOR": "1"}),
patch(
"homeassistant.bootstrap.async_activate_log_queue_handler"
) as mock_async_activate_log_queue_handler,
patch("logging.getLogger"),
):
await bootstrap.async_enable_logging(hass)
assert len(glob.glob(CONFIG_LOG_FILE)) == log_file_count
assert len(glob.glob(CONFIG_LOG_FILE)) == 0
mock_async_activate_log_queue_handler.assert_called_once()
mock_async_activate_log_queue_handler.reset_mock()
@@ -170,10 +162,9 @@ async def test_async_enable_logging_supervisor(
await hass.async_add_executor_job(write_log_file)
assert len(glob.glob(CONFIG_LOG_FILE)) == 1
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == 0
await bootstrap.async_enable_logging(hass)
assert len(glob.glob(CONFIG_LOG_FILE)) == log_file_count
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == old_log_file_count
assert len(glob.glob(CONFIG_LOG_FILE)) == 0
assert len(glob.glob(f"{CONFIG_LOG_FILE}.old")) == 1
mock_async_activate_log_queue_handler.assert_called_once()
mock_async_activate_log_queue_handler.reset_mock()