Compare commits

..

5 Commits

Author SHA1 Message Date
Joakim Sørensen
4b8e267282 Merge branch 'dev' into handle-timeoutu-in-subinfo-call 2025-09-10 09:32:47 +02:00
Joost Lekkerkerker
fa3eb1b3fe Merge branch 'dev' into handle-timeoutu-in-subinfo-call 2025-08-11 22:52:37 +02:00
Joakim Sørensen
a306114855 Merge branch 'dev' into handle-timeoutu-in-subinfo-call 2025-07-08 13:47:34 +01:00
Joakim Sørensen
3cafe318c1 Update tests/components/cloud/test_subscription.py 2025-07-08 13:19:02 +02:00
ludeeus
33ac13185a Fix error handling in subscription info retrieval and update tests 2025-07-08 11:15:38 +00:00
63 changed files with 236 additions and 2500 deletions

View File

@@ -169,7 +169,6 @@ homeassistant.components.dnsip.*
homeassistant.components.doorbird.*
homeassistant.components.dormakaba_dkey.*
homeassistant.components.downloader.*
homeassistant.components.droplet.*
homeassistant.components.dsmr.*
homeassistant.components.duckdns.*
homeassistant.components.dunehd.*

2
CODEOWNERS generated
View File

@@ -377,8 +377,6 @@ build.json @home-assistant/supervisor
/tests/components/dremel_3d_printer/ @tkdrob
/homeassistant/components/drop_connect/ @ChandlerSystems @pfrazer
/tests/components/drop_connect/ @ChandlerSystems @pfrazer
/homeassistant/components/droplet/ @sarahseidman
/tests/components/droplet/ @sarahseidman
/homeassistant/components/dsmr/ @Robbie1221
/tests/components/dsmr/ @Robbie1221
/homeassistant/components/dsmr_reader/ @sorted-bits @glodenox @erwindouna

View File

@@ -7,6 +7,6 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["accuweather"],
"requirements": ["accuweather==4.2.1"],
"requirements": ["accuweather==4.2.0"],
"single_config_entry": true
}

View File

@@ -14,7 +14,6 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import _LOGGER, CONF_LOGIN_DATA, DOMAIN
@@ -49,13 +48,12 @@ class AmazonDevicesCoordinator(DataUpdateCoordinator[dict[str, AmazonDevice]]):
entry.data[CONF_PASSWORD],
entry.data[CONF_LOGIN_DATA],
)
self.previous_devices: set[str] = set()
async def _async_update_data(self) -> dict[str, AmazonDevice]:
"""Update device data."""
try:
await self.api.login_mode_stored_data()
data = await self.api.get_devices_data()
return await self.api.get_devices_data()
except CannotConnect as err:
raise UpdateFailed(
translation_domain=DOMAIN,
@@ -74,31 +72,3 @@ class AmazonDevicesCoordinator(DataUpdateCoordinator[dict[str, AmazonDevice]]):
translation_key="invalid_auth",
translation_placeholders={"error": repr(err)},
) from err
else:
current_devices = set(data.keys())
if stale_devices := self.previous_devices - current_devices:
await self._async_remove_device_stale(stale_devices)
self.previous_devices = current_devices
return data
async def _async_remove_device_stale(
self,
stale_devices: set[str],
) -> None:
"""Remove stale device."""
device_registry = dr.async_get(self.hass)
for serial_num in stale_devices:
_LOGGER.debug(
"Detected change in devices: serial %s removed",
serial_num,
)
device = device_registry.async_get_device(
identifiers={(DOMAIN, serial_num)}
)
if device:
device_registry.async_update_device(
device_id=device.id,
remove_config_entry_id=self.config_entry.entry_id,
)

View File

@@ -64,7 +64,9 @@ rules:
repair-issues:
status: exempt
comment: no known use cases for repair issues or flows, yet
stale-devices: done
stale-devices:
status: todo
comment: automate the cleanup process
# Platinum
async-dependency: done

View File

@@ -57,7 +57,6 @@ from .api import (
_get_manager,
async_address_present,
async_ble_device_from_address,
async_current_scanners,
async_discovered_service_info,
async_get_advertisement_callback,
async_get_fallback_availability_interval,
@@ -115,7 +114,6 @@ __all__ = [
"HomeAssistantRemoteScanner",
"async_address_present",
"async_ble_device_from_address",
"async_current_scanners",
"async_discovered_service_info",
"async_get_advertisement_callback",
"async_get_fallback_availability_interval",

View File

@@ -66,22 +66,6 @@ def async_scanner_count(hass: HomeAssistant, connectable: bool = True) -> int:
return _get_manager(hass).async_scanner_count(connectable)
@hass_callback
def async_current_scanners(hass: HomeAssistant) -> list[BaseHaScanner]:
"""Return the list of currently active scanners.
This method returns a list of all active Bluetooth scanners registered
with Home Assistant, including both connectable and non-connectable scanners.
Args:
hass: Home Assistant instance
Returns:
List of all active scanner instances
"""
return _get_manager(hass).async_current_scanners()
@hass_callback
def async_discovered_service_info(
hass: HomeAssistant, connectable: bool = True

View File

@@ -25,7 +25,11 @@ async def async_subscription_info(cloud: Cloud[CloudClient]) -> SubscriptionInfo
return await cloud.payments.subscription_info()
except PaymentsApiError as exception:
_LOGGER.error("Failed to fetch subscription information - %s", exception)
except TimeoutError:
_LOGGER.error(
"A timeout of %s was reached while trying to fetch subscription information",
REQUEST_TIMEOUT,
)
return None

View File

@@ -1,37 +0,0 @@
"""The Droplet integration."""
from __future__ import annotations
import logging
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from .coordinator import DropletConfigEntry, DropletDataCoordinator
PLATFORMS: list[Platform] = [
Platform.SENSOR,
]
logger = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant, config_entry: DropletConfigEntry
) -> bool:
"""Set up Droplet from a config entry."""
droplet_coordinator = DropletDataCoordinator(hass, config_entry)
await droplet_coordinator.async_config_entry_first_refresh()
config_entry.runtime_data = droplet_coordinator
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: DropletConfigEntry
) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)

View File

@@ -1,118 +0,0 @@
"""Config flow for Droplet integration."""
from __future__ import annotations
from typing import Any
from pydroplet.droplet import DropletConnection, DropletDiscovery
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_CODE, CONF_DEVICE_ID, CONF_IP_ADDRESS, CONF_PORT
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .const import DOMAIN
class DropletConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle Droplet config flow."""
_droplet_discovery: DropletDiscovery
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle zeroconf discovery."""
self._droplet_discovery = DropletDiscovery(
discovery_info.host,
discovery_info.port,
discovery_info.name,
)
if not self._droplet_discovery.is_valid():
return self.async_abort(reason="invalid_discovery_info")
# In this case, device ID was part of the zeroconf discovery info
device_id: str = await self._droplet_discovery.get_device_id()
await self.async_set_unique_id(device_id)
self._abort_if_unique_id_configured(
updates={CONF_IP_ADDRESS: self._droplet_discovery.host},
)
self.context.update({"title_placeholders": {"name": device_id}})
return await self.async_step_confirm()
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the setup."""
errors: dict[str, str] = {}
device_id: str = await self._droplet_discovery.get_device_id()
if user_input is not None:
# Test if we can connect before returning
session = async_get_clientsession(self.hass)
if await self._droplet_discovery.try_connect(
session, user_input[CONF_CODE]
):
device_data = {
CONF_IP_ADDRESS: self._droplet_discovery.host,
CONF_PORT: self._droplet_discovery.port,
CONF_DEVICE_ID: device_id,
CONF_CODE: user_input[CONF_CODE],
}
return self.async_create_entry(
title=device_id,
data=device_data,
)
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_CODE): str,
}
),
description_placeholders={
"device_name": device_id,
},
errors=errors,
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors: dict[str, str] = {}
if user_input is not None:
self._droplet_discovery = DropletDiscovery(
user_input[CONF_IP_ADDRESS], DropletConnection.DEFAULT_PORT, ""
)
session = async_get_clientsession(self.hass)
if await self._droplet_discovery.try_connect(
session, user_input[CONF_CODE]
) and (device_id := await self._droplet_discovery.get_device_id()):
device_data = {
CONF_IP_ADDRESS: self._droplet_discovery.host,
CONF_PORT: self._droplet_discovery.port,
CONF_DEVICE_ID: device_id,
CONF_CODE: user_input[CONF_CODE],
}
await self.async_set_unique_id(device_id, raise_on_progress=False)
self._abort_if_unique_id_configured(
description_placeholders={CONF_DEVICE_ID: device_id},
)
return self.async_create_entry(
title=device_id,
data=device_data,
)
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{vol.Required(CONF_IP_ADDRESS): str, vol.Required(CONF_CODE): str}
),
errors=errors,
)

View File

@@ -1,11 +0,0 @@
"""Constants for the droplet integration."""
CONNECT_DELAY = 5
DOMAIN = "droplet"
DEVICE_NAME = "Droplet"
KEY_CURRENT_FLOW_RATE = "current_flow_rate"
KEY_VOLUME = "volume"
KEY_SIGNAL_QUALITY = "signal_quality"
KEY_SERVER_CONNECTIVITY = "server_connectivity"

View File

@@ -1,84 +0,0 @@
"""Droplet device data update coordinator object."""
from __future__ import annotations
import asyncio
import logging
import time
from pydroplet.droplet import Droplet
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_CODE, CONF_IP_ADDRESS, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONNECT_DELAY, DOMAIN
VERSION_TIMEOUT = 5
_LOGGER = logging.getLogger(__name__)
TIMEOUT = 1
type DropletConfigEntry = ConfigEntry[DropletDataCoordinator]
class DropletDataCoordinator(DataUpdateCoordinator[None]):
"""Droplet device object."""
config_entry: DropletConfigEntry
def __init__(self, hass: HomeAssistant, entry: DropletConfigEntry) -> None:
"""Initialize the device."""
super().__init__(
hass, _LOGGER, config_entry=entry, name=f"{DOMAIN}-{entry.unique_id}"
)
self.droplet = Droplet(
host=entry.data[CONF_IP_ADDRESS],
port=entry.data[CONF_PORT],
token=entry.data[CONF_CODE],
session=async_get_clientsession(self.hass),
logger=_LOGGER,
)
assert entry.unique_id is not None
self.unique_id = entry.unique_id
async def _async_setup(self) -> None:
if not await self.setup():
raise ConfigEntryNotReady("Device is offline")
# Droplet should send its metadata within 5 seconds
end = time.time() + VERSION_TIMEOUT
while not self.droplet.version_info_available():
await asyncio.sleep(TIMEOUT)
if time.time() > end:
_LOGGER.warning("Failed to get version info from Droplet")
return
async def _async_update_data(self) -> None:
if not self.droplet.connected:
raise UpdateFailed(
translation_domain=DOMAIN, translation_key="connection_error"
)
async def setup(self) -> bool:
"""Set up droplet client."""
self.config_entry.async_on_unload(self.droplet.stop_listening)
self.config_entry.async_create_background_task(
self.hass,
self.droplet.listen_forever(CONNECT_DELAY, self.async_set_updated_data),
"droplet-listen",
)
end = time.time() + CONNECT_DELAY
while time.time() < end:
if self.droplet.connected:
return True
await asyncio.sleep(TIMEOUT)
return False
def get_availability(self) -> bool:
"""Retrieve Droplet's availability status."""
return self.droplet.get_availability()

View File

@@ -1,15 +0,0 @@
{
"entity": {
"sensor": {
"current_flow_rate": {
"default": "mdi:chart-line"
},
"server_connectivity": {
"default": "mdi:web"
},
"signal_quality": {
"default": "mdi:waveform"
}
}
}
}

View File

@@ -1,11 +0,0 @@
{
"domain": "droplet",
"name": "Droplet",
"codeowners": ["@sarahseidman"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/droplet",
"iot_class": "local_push",
"quality_scale": "bronze",
"requirements": ["pydroplet==2.3.2"],
"zeroconf": ["_droplet._tcp.local."]
}

View File

@@ -1,72 +0,0 @@
rules:
# Bronze
action-setup:
status: exempt
comment: |
No custom actions defined
appropriate-polling:
status: exempt
comment: |
No polling
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: |
No custom actions are defined.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup: done
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: |
No custom actions are defined.
config-entry-unloading: done
docs-configuration-parameters: todo
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: todo
parallel-updates: todo
reauthentication-flow: todo
test-coverage: todo
# Gold
devices: done
diagnostics: todo
discovery-update-info: done
discovery: done
docs-data-update: done
docs-examples: todo
docs-known-limitations: todo
docs-supported-devices: todo
docs-supported-functions: done
docs-troubleshooting: todo
docs-use-cases: done
dynamic-devices: todo
entity-category: done
entity-device-class: done
entity-disabled-by-default: todo
entity-translations: done
exception-translations: todo
icon-translations: done
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: todo
inject-websession: done
strict-typing: done

View File

@@ -1,131 +0,0 @@
"""Support for Droplet."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from pydroplet.droplet import Droplet
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import EntityCategory, UnitOfVolume, UnitOfVolumeFlowRate
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DOMAIN,
KEY_CURRENT_FLOW_RATE,
KEY_SERVER_CONNECTIVITY,
KEY_SIGNAL_QUALITY,
KEY_VOLUME,
)
from .coordinator import DropletConfigEntry, DropletDataCoordinator
ML_L_CONVERSION = 1000
@dataclass(kw_only=True, frozen=True)
class DropletSensorEntityDescription(SensorEntityDescription):
"""Describes Droplet sensor entity."""
value_fn: Callable[[Droplet], float | str | None]
last_reset_fn: Callable[[Droplet], datetime | None] = lambda _: None
SENSORS: list[DropletSensorEntityDescription] = [
DropletSensorEntityDescription(
key=KEY_CURRENT_FLOW_RATE,
translation_key=KEY_CURRENT_FLOW_RATE,
device_class=SensorDeviceClass.VOLUME_FLOW_RATE,
native_unit_of_measurement=UnitOfVolumeFlowRate.LITERS_PER_MINUTE,
suggested_unit_of_measurement=UnitOfVolumeFlowRate.GALLONS_PER_MINUTE,
suggested_display_precision=2,
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda device: device.get_flow_rate(),
),
DropletSensorEntityDescription(
key=KEY_VOLUME,
device_class=SensorDeviceClass.WATER,
native_unit_of_measurement=UnitOfVolume.LITERS,
suggested_unit_of_measurement=UnitOfVolume.GALLONS,
suggested_display_precision=2,
state_class=SensorStateClass.TOTAL,
value_fn=lambda device: device.get_volume_delta() / ML_L_CONVERSION,
last_reset_fn=lambda device: device.get_volume_last_fetched(),
),
DropletSensorEntityDescription(
key=KEY_SERVER_CONNECTIVITY,
translation_key=KEY_SERVER_CONNECTIVITY,
device_class=SensorDeviceClass.ENUM,
options=["connected", "connecting", "disconnected"],
value_fn=lambda device: device.get_server_status(),
entity_category=EntityCategory.DIAGNOSTIC,
),
DropletSensorEntityDescription(
key=KEY_SIGNAL_QUALITY,
translation_key=KEY_SIGNAL_QUALITY,
device_class=SensorDeviceClass.ENUM,
options=["no_signal", "weak_signal", "strong_signal"],
value_fn=lambda device: device.get_signal_quality(),
entity_category=EntityCategory.DIAGNOSTIC,
),
]
async def async_setup_entry(
hass: HomeAssistant,
config_entry: DropletConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Droplet sensors from config entry."""
coordinator = config_entry.runtime_data
async_add_entities([DropletSensor(coordinator, sensor) for sensor in SENSORS])
class DropletSensor(CoordinatorEntity[DropletDataCoordinator], SensorEntity):
"""Representation of a Droplet."""
entity_description: DropletSensorEntityDescription
_attr_has_entity_name = True
def __init__(
self,
coordinator: DropletDataCoordinator,
entity_description: DropletSensorEntityDescription,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self.entity_description = entity_description
unique_id = coordinator.config_entry.unique_id
self._attr_unique_id = f"{unique_id}_{entity_description.key}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self.coordinator.unique_id)},
manufacturer=self.coordinator.droplet.get_manufacturer(),
model=self.coordinator.droplet.get_model(),
sw_version=self.coordinator.droplet.get_fw_version(),
serial_number=self.coordinator.droplet.get_sn(),
)
@property
def available(self) -> bool:
"""Get Droplet's availability."""
return self.coordinator.get_availability()
@property
def native_value(self) -> float | str | None:
"""Return the value reported by the sensor."""
return self.entity_description.value_fn(self.coordinator.droplet)
@property
def last_reset(self) -> datetime | None:
"""Return the last reset of the sensor, if applicable."""
return self.entity_description.last_reset_fn(self.coordinator.droplet)

View File

@@ -1,46 +0,0 @@
{
"config": {
"step": {
"user": {
"title": "Configure Droplet integration",
"description": "Manually enter Droplet's connection details.",
"data": {
"ip_address": "[%key:common::config_flow::data::ip%]",
"code": "Pairing code"
},
"data_description": {
"ip_address": "Droplet's IP address",
"code": "Code from the Droplet app"
}
},
"confirm": {
"title": "Confirm association",
"description": "Enter pairing code to connect to {device_name}.",
"data": {
"code": "[%key:component::droplet::config::step::user::data::code%]"
},
"data_description": {
"code": "[%key:component::droplet::config::step::user::data_description::code%]"
}
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]"
}
},
"entity": {
"sensor": {
"server_connectivity": { "name": "Server status" },
"signal_quality": { "name": "Signal quality" },
"current_flow_rate": { "name": "Flow rate" }
}
},
"exceptions": {
"connection_error": {
"message": "Disconnected from Droplet"
}
}
}

View File

@@ -6,5 +6,5 @@
"documentation": "https://www.home-assistant.io/integrations/feedreader",
"iot_class": "cloud_polling",
"loggers": ["feedparser", "sgmllib3k"],
"requirements": ["feedparser==6.0.12"]
"requirements": ["feedparser==6.0.11"]
}

View File

@@ -139,27 +139,21 @@ class HassIOIngress(HomeAssistantView):
url = url.with_query(request.query_string)
# Start proxy
try:
_LOGGER.debug(
"Proxying WebSocket to %s / %s, upstream url: %s", token, path, url
async with self._websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_client:
# Proxy requests
await asyncio.wait(
[
create_eager_task(_websocket_forward(ws_server, ws_client)),
create_eager_task(_websocket_forward(ws_client, ws_server)),
],
return_when=asyncio.FIRST_COMPLETED,
)
async with self._websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_client:
# Proxy requests
await asyncio.wait(
[
create_eager_task(_websocket_forward(ws_server, ws_client)),
create_eager_task(_websocket_forward(ws_client, ws_server)),
],
return_when=asyncio.FIRST_COMPLETED,
)
except TimeoutError:
_LOGGER.warning("WebSocket proxy to %s / %s timed out", token, path)
return ws_server
@@ -232,7 +226,6 @@ class HassIOIngress(HomeAssistantView):
aiohttp.ClientError,
aiohttp.ClientPayloadError,
ConnectionResetError,
ConnectionError,
) as err:
_LOGGER.debug("Stream error %s / %s: %s", token, path, err)

View File

@@ -6,5 +6,5 @@
"integration_type": "system",
"iot_class": "local_polling",
"quality_scale": "internal",
"requirements": ["isal==1.8.0"]
"requirements": ["isal==1.7.1"]
}

View File

@@ -634,8 +634,8 @@ DISCOVERY_SCHEMAS = [
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="NitrogenDioxideSensor",
translation_key="nitrogen_dioxide",
native_unit_of_measurement=CONCENTRATION_PARTS_PER_MILLION,
device_class=SensorDeviceClass.NITROGEN_DIOXIDE,
state_class=SensorStateClass.MEASUREMENT,
),
entity_class=MatterSensor,

View File

@@ -435,9 +435,6 @@
"evse_soc": {
"name": "State of charge"
},
"nitrogen_dioxide": {
"name": "[%key:component::sensor::entity_component::nitrogen_dioxide::name%]"
},
"pump_control_mode": {
"name": "Control mode",
"state": {

View File

@@ -43,9 +43,6 @@
},
"valve_status": {
"default": "mdi:valve"
},
"illuminance_level": {
"default": "mdi:brightness-5"
}
},
"switch": {

View File

@@ -1421,14 +1421,6 @@ RPC_SENSORS: Final = {
entity_category=EntityCategory.DIAGNOSTIC,
entity_class=RpcBluTrvSensor,
),
"illuminance_illumination": RpcSensorDescription(
key="illuminance",
sub_key="illumination",
name="Illuminance Level",
translation_key="illuminance_level",
device_class=SensorDeviceClass.ENUM,
options=["dark", "twilight", "bright"],
),
}

View File

@@ -193,13 +193,6 @@
"opened": "Opened",
"opening": "[%key:common::state::opening%]"
}
},
"illuminance_level": {
"state": {
"dark": "Dark",
"twilight": "Twilight",
"bright": "Bright"
}
}
}
},

View File

@@ -5,21 +5,18 @@ from __future__ import annotations
import logging
from typing import Any
from habluetooth import BluetoothScanningMode
from switchbot import (
SwitchbotAccountConnectionError,
SwitchBotAdvertisement,
SwitchbotApiError,
SwitchbotAuthenticationError,
SwitchbotModel,
fetch_cloud_devices,
parse_advertisement_data,
)
import voluptuous as vol
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
async_current_scanners,
async_discovered_service_info,
)
from homeassistant.config_entries import (
@@ -90,8 +87,6 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
"""Initialize the config flow."""
self._discovered_adv: SwitchBotAdvertisement | None = None
self._discovered_advs: dict[str, SwitchBotAdvertisement] = {}
self._cloud_username: str | None = None
self._cloud_password: str | None = None
async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
@@ -181,17 +176,9 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the SwitchBot API auth step."""
errors: dict[str, str] = {}
errors = {}
assert self._discovered_adv is not None
description_placeholders: dict[str, str] = {}
# If we have saved credentials from cloud login, try them first
if user_input is None and self._cloud_username and self._cloud_password:
user_input = {
CONF_USERNAME: self._cloud_username,
CONF_PASSWORD: self._cloud_password,
}
description_placeholders = {}
if user_input is not None:
model: SwitchbotModel = self._discovered_adv.data["modelName"]
cls = ENCRYPTED_SWITCHBOT_MODEL_TO_CLASS[model]
@@ -213,9 +200,6 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
_LOGGER.debug("Authentication failed: %s", ex, exc_info=True)
errors = {"base": "auth_failed"}
description_placeholders = {"error_detail": str(ex)}
# Clear saved credentials if auth failed
self._cloud_username = None
self._cloud_password = None
else:
return await self.async_step_encrypted_key(key_details)
@@ -255,7 +239,7 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the encryption key step."""
errors: dict[str, str] = {}
errors = {}
assert self._discovered_adv is not None
if user_input is not None:
model: SwitchbotModel = self._discovered_adv.data["modelName"]
@@ -324,73 +308,7 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the user step to choose cloud login or direct discovery."""
# Check if all scanners are in active mode
# If so, skip the menu and go directly to device selection
scanners = async_current_scanners(self.hass)
if scanners and all(
scanner.current_mode == BluetoothScanningMode.ACTIVE for scanner in scanners
):
# All scanners are active, skip the menu
return await self.async_step_select_device()
return self.async_show_menu(
step_id="user",
menu_options=["cloud_login", "select_device"],
)
async def async_step_cloud_login(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the cloud login step."""
errors: dict[str, str] = {}
description_placeholders: dict[str, str] = {}
if user_input is not None:
try:
await fetch_cloud_devices(
async_get_clientsession(self.hass),
user_input[CONF_USERNAME],
user_input[CONF_PASSWORD],
)
except (SwitchbotApiError, SwitchbotAccountConnectionError) as ex:
_LOGGER.debug(
"Failed to connect to SwitchBot API: %s", ex, exc_info=True
)
raise AbortFlow(
"api_error", description_placeholders={"error_detail": str(ex)}
) from ex
except SwitchbotAuthenticationError as ex:
_LOGGER.debug("Authentication failed: %s", ex, exc_info=True)
errors = {"base": "auth_failed"}
description_placeholders = {"error_detail": str(ex)}
else:
# Save credentials temporarily for the duration of this flow
# to avoid re-prompting if encrypted device auth is needed
# These will be discarded when the flow completes
self._cloud_username = user_input[CONF_USERNAME]
self._cloud_password = user_input[CONF_PASSWORD]
return await self.async_step_select_device()
user_input = user_input or {}
return self.async_show_form(
step_id="cloud_login",
errors=errors,
data_schema=vol.Schema(
{
vol.Required(
CONF_USERNAME, default=user_input.get(CONF_USERNAME)
): str,
vol.Required(CONF_PASSWORD): str,
}
),
description_placeholders=description_placeholders,
)
async def async_step_select_device(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the step to pick discovered device."""
"""Handle the user step to pick discovered device."""
errors: dict[str, str] = {}
device_adv: SwitchBotAdvertisement | None = None
if user_input is not None:
@@ -415,7 +333,7 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
return await self.async_step_confirm()
return self.async_show_form(
step_id="select_device",
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): vol.In(

View File

@@ -41,5 +41,5 @@
"iot_class": "local_push",
"loggers": ["switchbot"],
"quality_scale": "gold",
"requirements": ["PySwitchbot==0.70.0"]
"requirements": ["PySwitchbot==0.69.0"]
}

View File

@@ -3,24 +3,6 @@
"flow_title": "{name} ({address})",
"step": {
"user": {
"description": "Would you like to sign in to your SwitchBot account to download device model information? This improves device discovery, especially when using passive Bluetooth scanning. If your Bluetooth adapter uses active scanning, you can skip this step.",
"menu_options": {
"cloud_login": "Sign in to SwitchBot account",
"select_device": "Continue without signing in"
}
},
"cloud_login": {
"description": "Please provide your SwitchBot app username and password. This data won't be saved and is only used to retrieve device model information for better discovery. Usernames and passwords are case-sensitive.",
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"username": "[%key:component::switchbot::config::step::encrypted_auth::data_description::username%]",
"password": "[%key:component::switchbot::config::step::encrypted_auth::data_description::password%]"
}
},
"select_device": {
"data": {
"address": "MAC address"
},

View File

@@ -979,6 +979,7 @@ SENSORS: dict[str, tuple[TuyaSensorEntityDescription, ...]] = {
),
TuyaSensorEntityDescription(
key=DPCode.WINDSPEED_AVG,
translation_key="wind_speed",
device_class=SensorDeviceClass.WIND_SPEED,
state_class=SensorStateClass.MEASUREMENT,
),

View File

@@ -600,9 +600,6 @@
"status": {
"name": "Status"
},
"depth": {
"name": "Depth"
},
"last_amount": {
"name": "Last amount"
},

View File

@@ -17,7 +17,6 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import WhirlpoolConfigEntry
from .entity import WhirlpoolEntity
PARALLEL_UPDATES = 1
SCAN_INTERVAL = timedelta(minutes=5)

View File

@@ -25,8 +25,6 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import WhirlpoolConfigEntry
from .entity import WhirlpoolEntity
PARALLEL_UPDATES = 1
AIRCON_MODE_MAP = {
AirconMode.Cool: HVACMode.COOL,
AirconMode.Heat: HVACMode.HEAT,
@@ -45,6 +43,13 @@ AIRCON_FANSPEED_MAP = {
FAN_MODE_TO_AIRCON_FANSPEED = {v: k for k, v in AIRCON_FANSPEED_MAP.items()}
SUPPORTED_FAN_MODES = [FAN_AUTO, FAN_HIGH, FAN_MEDIUM, FAN_LOW, FAN_OFF]
SUPPORTED_HVAC_MODES = [
HVACMode.COOL,
HVACMode.HEAT,
HVACMode.FAN_ONLY,
HVACMode.OFF,
]
SUPPORTED_MAX_TEMP = 30
SUPPORTED_MIN_TEMP = 16
SUPPORTED_SWING_MODES = [SWING_HORIZONTAL, SWING_OFF]
@@ -66,9 +71,9 @@ class AirConEntity(WhirlpoolEntity, ClimateEntity):
_appliance: Aircon
_attr_fan_modes = SUPPORTED_FAN_MODES
_attr_name = None
_attr_fan_modes = [*FAN_MODE_TO_AIRCON_FANSPEED.keys()]
_attr_hvac_modes = [HVACMode.OFF, *HVAC_MODE_TO_AIRCON_MODE.keys()]
_attr_hvac_modes = SUPPORTED_HVAC_MODES
_attr_max_temp = SUPPORTED_MAX_TEMP
_attr_min_temp = SUPPORTED_MIN_TEMP
_attr_supported_features = (
@@ -138,7 +143,8 @@ class AirConEntity(WhirlpoolEntity, ClimateEntity):
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set fan mode."""
fanspeed = FAN_MODE_TO_AIRCON_FANSPEED[fan_mode]
if not (fanspeed := FAN_MODE_TO_AIRCON_FANSPEED.get(fan_mode)):
raise ValueError(f"Invalid fan mode {fan_mode}")
await self._appliance.set_fanspeed(fanspeed)
@property

View File

@@ -24,7 +24,6 @@ from homeassistant.util.dt import utcnow
from . import WhirlpoolConfigEntry
from .entity import WhirlpoolEntity
PARALLEL_UPDATES = 1
SCAN_INTERVAL = timedelta(minutes=5)
WASHER_TANK_FILL = {

View File

@@ -149,7 +149,6 @@ FLOWS = {
"downloader",
"dremel_3d_printer",
"drop_connect",
"droplet",
"dsmr",
"dsmr_reader",
"duke_energy",

View File

@@ -1441,12 +1441,6 @@
"config_flow": true,
"iot_class": "local_push"
},
"droplet": {
"name": "Droplet",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_push"
},
"dsmr": {
"name": "DSMR Smart Meter",
"integration_type": "hub",

View File

@@ -464,11 +464,6 @@ ZEROCONF = {
"domain": "daikin",
},
],
"_droplet._tcp.local.": [
{
"domain": "droplet",
},
],
"_dvl-deviceapi._tcp.local.": [
{
"domain": "devolo_home_control",

View File

@@ -1002,7 +1002,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
via_device_id=via_device_id,
)
# This is safe because _async_update_device will always return a device
# This is safe because async_update_device will always return a device
# in this use case.
assert device
return device
@@ -1279,7 +1279,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
# Change modified_at if we are changing something that we store
new_values["modified_at"] = utcnow()
self.hass.verify_event_loop_thread("device_registry._async_update_device")
self.hass.verify_event_loop_thread("device_registry.async_update_device")
new = attr.evolve(old, **new_values)
self.devices[device_id] = new
@@ -1451,7 +1451,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
)
for other_device in list(self.devices.values()):
if other_device.via_device_id == device_id:
self._async_update_device(other_device.id, via_device_id=None)
self.async_update_device(other_device.id, via_device_id=None)
self.hass.bus.async_fire_internal(
EVENT_DEVICE_REGISTRY_UPDATED,
_EventDeviceRegistryUpdatedData_Remove(
@@ -1574,7 +1574,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
"""Clear config entry from registry entries."""
now_time = time.time()
for device in self.devices.get_devices_for_config_entry_id(config_entry_id):
self._async_update_device(device.id, remove_config_entry_id=config_entry_id)
self.async_update_device(device.id, remove_config_entry_id=config_entry_id)
for deleted_device in list(self.deleted_devices.values()):
config_entries = deleted_device.config_entries
if config_entry_id not in config_entries:
@@ -1608,8 +1608,9 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
) -> None:
"""Clear config entry from registry entries."""
now_time = time.time()
now_time = time.time()
for device in self.devices.get_devices_for_config_entry_id(config_entry_id):
self._async_update_device(
self.async_update_device(
device.id,
remove_config_entry_id=config_entry_id,
remove_config_subentry_id=config_subentry_id,
@@ -1670,7 +1671,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
def async_clear_area_id(self, area_id: str) -> None:
"""Clear area id from registry entries."""
for device in self.devices.get_devices_for_area_id(area_id):
self._async_update_device(device.id, area_id=None)
self.async_update_device(device.id, area_id=None)
for deleted_device in list(self.deleted_devices.values()):
if deleted_device.area_id != area_id:
continue
@@ -1683,7 +1684,7 @@ class DeviceRegistry(BaseRegistry[dict[str, list[dict[str, Any]]]]):
def async_clear_label_id(self, label_id: str) -> None:
"""Clear label from registry entries."""
for device in self.devices.get_devices_for_label(label_id):
self._async_update_device(device.id, labels=device.labels - {label_id})
self.async_update_device(device.id, labels=device.labels - {label_id})
for deleted_device in list(self.deleted_devices.values()):
if label_id not in deleted_device.labels:
continue
@@ -1747,7 +1748,7 @@ def async_config_entry_disabled_by_changed(
for device in devices:
if device.disabled_by is not DeviceEntryDisabler.CONFIG_ENTRY:
continue
registry._async_update_device(device.id, disabled_by=None) # noqa: SLF001
registry.async_update_device(device.id, disabled_by=None)
return
enabled_config_entries = {
@@ -1764,7 +1765,7 @@ def async_config_entry_disabled_by_changed(
enabled_config_entries
):
continue
registry._async_update_device( # noqa: SLF001
registry.async_update_device(
device.id, disabled_by=DeviceEntryDisabler.CONFIG_ENTRY
)
@@ -1802,7 +1803,7 @@ def async_cleanup(
for device in list(dev_reg.devices.values()):
for config_entry_id in device.config_entries:
if config_entry_id not in config_entry_ids:
dev_reg._async_update_device( # noqa: SLF001
dev_reg.async_update_device(
device.id, remove_config_entry_id=config_entry_id
)

View File

@@ -760,7 +760,7 @@ def _get_permissible_entity_candidates(
@bind_hass
async def entity_service_call(
hass: HomeAssistant,
registered_entities: dict[str, Entity] | Callable[[], dict[str, Entity]],
registered_entities: dict[str, Entity],
func: str | HassJob,
call: ServiceCall,
required_features: Iterable[int] | None = None,
@@ -799,15 +799,10 @@ async def entity_service_call(
else:
data = call
if callable(registered_entities):
_registered_entities = registered_entities()
else:
_registered_entities = registered_entities
# A list with entities to call the service on.
entity_candidates = _get_permissible_entity_candidates(
call,
_registered_entities,
registered_entities,
entity_perms,
target_all_entities,
all_referenced,
@@ -1117,23 +1112,6 @@ class ReloadServiceHelper[_T]:
self._service_condition.notify_all()
def _validate_entity_service_schema(
schema: VolDictType | VolSchemaType | None,
) -> VolSchemaType:
"""Validate that a schema is an entity service schema."""
if schema is None or isinstance(schema, dict):
return cv.make_entity_service_schema(schema)
if not cv.is_entity_service_schema(schema):
from .frame import ReportBehavior, report_usage # noqa: PLC0415
report_usage(
"registers an entity service with a non entity service schema",
core_behavior=ReportBehavior.LOG,
breaks_in_ha_version="2025.9",
)
return schema
@callback
def async_register_entity_service(
hass: HomeAssistant,
@@ -1153,7 +1131,16 @@ def async_register_entity_service(
EntityPlatform.async_register_entity_service and should not be called
directly by integrations.
"""
schema = _validate_entity_service_schema(schema)
if schema is None or isinstance(schema, dict):
schema = cv.make_entity_service_schema(schema)
elif not cv.is_entity_service_schema(schema):
from .frame import ReportBehavior, report_usage # noqa: PLC0415
report_usage(
"registers an entity service with a non entity service schema",
core_behavior=ReportBehavior.LOG,
breaks_in_ha_version="2025.9",
)
service_func: str | HassJob[..., Any]
service_func = func if isinstance(func, str) else HassJob(func)
@@ -1172,47 +1159,3 @@ def async_register_entity_service(
supports_response,
job_type=job_type,
)
@callback
def async_register_platform_entity_service(
hass: HomeAssistant,
service_domain: str,
service_name: str,
*,
entity_domain: str,
func: str | Callable[..., Any],
required_features: Iterable[int] | None = None,
schema: VolDictType | VolSchemaType | None,
supports_response: SupportsResponse = SupportsResponse.NONE,
) -> None:
"""Help registering a platform entity service."""
from .entity_platform import DATA_DOMAIN_PLATFORM_ENTITIES # noqa: PLC0415
schema = _validate_entity_service_schema(schema)
service_func: str | HassJob[..., Any]
service_func = func if isinstance(func, str) else HassJob(func)
def get_entities() -> dict[str, Entity]:
entities = hass.data.get(DATA_DOMAIN_PLATFORM_ENTITIES, {}).get(
(entity_domain, service_domain)
)
if entities is None:
return {}
return entities
hass.services.async_register(
service_domain,
service_name,
partial(
entity_service_call,
hass,
get_entities,
service_func,
required_features=required_features,
),
schema,
supports_response,
job_type=HassJobType.Coroutinefunction,
)

10
mypy.ini generated
View File

@@ -1446,16 +1446,6 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.droplet.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.dsmr.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@@ -468,7 +468,7 @@ filterwarnings = [
"ignore:Unknown pytest.mark.dataset:pytest.PytestUnknownMarkWarning:tests.components.screenlogic",
# -- DeprecationWarning already fixed in our codebase
# https://github.com/kurtmckee/feedparser/ - 6.0.12
# https://github.com/kurtmckee/feedparser/pull/389 - 6.0.11
"ignore:.*a temporary mapping .* from `updated_parsed` to `published_parsed` if `updated_parsed` doesn't exist:DeprecationWarning:feedparser.util",
# -- design choice 3rd party
@@ -569,6 +569,9 @@ filterwarnings = [
"ignore:\"is.*\" with '.*' literal:SyntaxWarning:importlib._bootstrap",
# -- New in Python 3.13
# https://github.com/kurtmckee/feedparser/pull/389 - >6.0.11
# https://github.com/kurtmckee/feedparser/issues/481
"ignore:'count' is passed as positional argument:DeprecationWarning:feedparser.html",
# https://github.com/youknowone/python-deadlib - Backports for aifc, telnetlib
"ignore:aifc was removed in Python 3.13.*'standard-aifc':DeprecationWarning:speech_recognition",
"ignore:telnetlib was removed in Python 3.13.*'standard-telnetlib':DeprecationWarning:homeassistant.components.hddtemp.sensor",

11
requirements_all.txt generated
View File

@@ -84,7 +84,7 @@ PyQRCode==1.2.1
PyRMVtransport==0.3.3
# homeassistant.components.switchbot
PySwitchbot==0.70.0
PySwitchbot==0.69.0
# homeassistant.components.switchmate
PySwitchmate==0.5.1
@@ -131,7 +131,7 @@ TwitterAPI==2.7.12
WSDiscovery==2.1.2
# homeassistant.components.accuweather
accuweather==4.2.1
accuweather==4.2.0
# homeassistant.components.adax
adax==0.4.0
@@ -936,7 +936,7 @@ faadelays==2023.9.1
fastdotcom==0.0.3
# homeassistant.components.feedreader
feedparser==6.0.12
feedparser==6.0.11
# homeassistant.components.file
file-read-backwards==2.0.0
@@ -1277,7 +1277,7 @@ iottycloud==0.3.0
iperf3==0.1.11
# homeassistant.components.isal
isal==1.8.0
isal==1.7.1
# homeassistant.components.gogogate2
ismartgate==5.0.2
@@ -1945,9 +1945,6 @@ pydrawise==2025.9.0
# homeassistant.components.android_ip_webcam
pydroid-ipcam==3.0.0
# homeassistant.components.droplet
pydroplet==2.3.2
# homeassistant.components.ebox
pyebox==1.1.4

View File

@@ -81,7 +81,7 @@ PyQRCode==1.2.1
PyRMVtransport==0.3.3
# homeassistant.components.switchbot
PySwitchbot==0.70.0
PySwitchbot==0.69.0
# homeassistant.components.syncthru
PySyncThru==0.8.0
@@ -119,7 +119,7 @@ Tami4EdgeAPI==3.0
WSDiscovery==2.1.2
# homeassistant.components.accuweather
accuweather==4.2.1
accuweather==4.2.0
# homeassistant.components.adax
adax==0.4.0
@@ -815,7 +815,7 @@ faadelays==2023.9.1
fastdotcom==0.0.3
# homeassistant.components.feedreader
feedparser==6.0.12
feedparser==6.0.11
# homeassistant.components.file
file-read-backwards==2.0.0
@@ -1108,7 +1108,7 @@ iometer==0.1.0
iottycloud==0.3.0
# homeassistant.components.isal
isal==1.8.0
isal==1.7.1
# homeassistant.components.gogogate2
ismartgate==5.0.2
@@ -1626,9 +1626,6 @@ pydrawise==2025.9.0
# homeassistant.components.android_ip_webcam
pydroid-ipcam==3.0.0
# homeassistant.components.droplet
pydroplet==2.3.2
# homeassistant.components.ecoforest
pyecoforest==0.4.0

View File

@@ -30,6 +30,24 @@ async def test_show_form(hass: HomeAssistant) -> None:
assert result["step_id"] == "user"
async def test_api_key_too_short(hass: HomeAssistant) -> None:
"""Test that errors are shown when API key is too short."""
# The API key length check is done by the library without polling the AccuWeather
# server so we don't need to patch the library method.
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={
CONF_NAME: "abcd",
CONF_API_KEY: "foo",
CONF_LATITUDE: 55.55,
CONF_LONGITUDE: 122.12,
},
)
assert result["errors"] == {CONF_API_KEY: "invalid_api_key"}
async def test_invalid_api_key(
hass: HomeAssistant, mock_accuweather_client: AsyncMock
) -> None:

View File

@@ -1,9 +1,9 @@
"""Alexa Devices tests configuration."""
from collections.abc import Generator
from copy import deepcopy
from unittest.mock import AsyncMock, patch
from aioamazondevices.api import AmazonDevice, AmazonDeviceSensor
from aioamazondevices.const import DEVICE_TYPE_TO_MODEL
import pytest
@@ -14,7 +14,7 @@ from homeassistant.components.alexa_devices.const import (
)
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from .const import TEST_DEVICE, TEST_PASSWORD, TEST_SERIAL_NUMBER, TEST_USERNAME
from .const import TEST_PASSWORD, TEST_SERIAL_NUMBER, TEST_USERNAME
from tests.common import MockConfigEntry
@@ -47,7 +47,27 @@ def mock_amazon_devices_client() -> Generator[AsyncMock]:
"customer_info": {"user_id": TEST_USERNAME},
}
client.get_devices_data.return_value = {
TEST_SERIAL_NUMBER: deepcopy(TEST_DEVICE)
TEST_SERIAL_NUMBER: AmazonDevice(
account_name="Echo Test",
capabilities=["AUDIO_PLAYER", "MICROPHONE"],
device_family="mine",
device_type="echo",
device_owner_customer_id="amazon_ower_id",
device_cluster_members=[TEST_SERIAL_NUMBER],
online=True,
serial_number=TEST_SERIAL_NUMBER,
software_version="echo_test_software_version",
do_not_disturb=False,
response_style=None,
bluetooth_state=True,
entity_id="11111111-2222-3333-4444-555555555555",
appliance_id="G1234567890123456789012345678A",
sensors={
"temperature": AmazonDeviceSensor(
name="temperature", value="22.5", scale="CELSIUS"
)
},
)
}
client.get_model_details = lambda device: DEVICE_TYPE_TO_MODEL.get(
device.device_type

View File

@@ -1,32 +1,8 @@
"""Alexa Devices tests const."""
from aioamazondevices.api import AmazonDevice, AmazonDeviceSensor
TEST_CODE = "023123"
TEST_PASSWORD = "fake_password"
TEST_SERIAL_NUMBER = "echo_test_serial_number"
TEST_USERNAME = "fake_email@gmail.com"
TEST_DEVICE_ID = "echo_test_device_id"
TEST_DEVICE = AmazonDevice(
account_name="Echo Test",
capabilities=["AUDIO_PLAYER", "MICROPHONE"],
device_family="mine",
device_type="echo",
device_owner_customer_id="amazon_ower_id",
device_cluster_members=[TEST_SERIAL_NUMBER],
online=True,
serial_number=TEST_SERIAL_NUMBER,
software_version="echo_test_software_version",
do_not_disturb=False,
response_style=None,
bluetooth_state=True,
entity_id="11111111-2222-3333-4444-555555555555",
appliance_id="G1234567890123456789012345678A",
sensors={
"temperature": AmazonDeviceSensor(
name="temperature", value="22.5", scale="CELSIUS"
)
},
)

View File

@@ -1,73 +0,0 @@
"""Tests for the Alexa Devices coordinator."""
from unittest.mock import AsyncMock
from aioamazondevices.api import AmazonDevice, AmazonDeviceSensor
from freezegun.api import FrozenDateTimeFactory
from homeassistant.components.alexa_devices.coordinator import SCAN_INTERVAL
from homeassistant.const import STATE_ON
from homeassistant.core import HomeAssistant
from . import setup_integration
from .const import TEST_DEVICE, TEST_SERIAL_NUMBER
from tests.common import MockConfigEntry, async_fire_time_changed
async def test_coordinator_stale_device(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
mock_amazon_devices_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test coordinator data update removes stale Alexa devices."""
entity_id_0 = "binary_sensor.echo_test_connectivity"
entity_id_1 = "binary_sensor.echo_test_2_connectivity"
mock_amazon_devices_client.get_devices_data.return_value = {
TEST_SERIAL_NUMBER: TEST_DEVICE,
"echo_test_2_serial_number_2": AmazonDevice(
account_name="Echo Test 2",
capabilities=["AUDIO_PLAYER", "MICROPHONE"],
device_family="mine",
device_type="echo",
device_owner_customer_id="amazon_ower_id",
device_cluster_members=["echo_test_2_serial_number_2"],
online=True,
serial_number="echo_test_2_serial_number_2",
software_version="echo_test_2_software_version",
do_not_disturb=False,
response_style=None,
bluetooth_state=True,
entity_id="11111111-2222-3333-4444-555555555555",
appliance_id="G1234567890123456789012345678A",
sensors={
"temperature": AmazonDeviceSensor(
name="temperature", value="22.5", scale="CELSIUS"
)
},
),
}
await setup_integration(hass, mock_config_entry)
assert (state := hass.states.get(entity_id_0))
assert state.state == STATE_ON
assert (state := hass.states.get(entity_id_1))
assert state.state == STATE_ON
mock_amazon_devices_client.get_devices_data.return_value = {
TEST_SERIAL_NUMBER: TEST_DEVICE,
}
freezer.tick(SCAN_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (state := hass.states.get(entity_id_0))
assert state.state == STATE_ON
# Entity is removed
assert not hass.states.get(entity_id_1)

View File

@@ -9,7 +9,6 @@ from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import (
MONOTONIC_TIME,
BaseHaRemoteScanner,
BluetoothScanningMode,
HaBluetoothConnector,
async_scanner_by_source,
async_scanner_devices_by_address,
@@ -17,7 +16,6 @@ from homeassistant.components.bluetooth import (
from homeassistant.core import HomeAssistant
from . import (
FakeRemoteScanner,
FakeScanner,
MockBleakClient,
_get_manager,
@@ -163,68 +161,3 @@ async def test_async_scanner_devices_by_address_non_connectable(
assert devices[0].ble_device.name == switchbot_device.name
assert devices[0].advertisement.local_name == switchbot_device_adv.local_name
cancel()
@pytest.mark.usefixtures("enable_bluetooth")
async def test_async_current_scanners(hass: HomeAssistant) -> None:
"""Test getting the list of current scanners."""
# The enable_bluetooth fixture registers one scanner
initial_scanners = bluetooth.async_current_scanners(hass)
assert len(initial_scanners) == 1
initial_scanner_count = len(initial_scanners)
# Verify current_mode is accessible on the initial scanner
for scanner in initial_scanners:
assert hasattr(scanner, "current_mode")
# The mode might be None or a BluetoothScanningMode enum value
# Register additional connectable scanners
hci0_scanner = FakeScanner("hci0", "hci0")
hci1_scanner = FakeScanner("hci1", "hci1")
cancel_hci0 = bluetooth.async_register_scanner(hass, hci0_scanner)
cancel_hci1 = bluetooth.async_register_scanner(hass, hci1_scanner)
# Test that the new scanners are added
scanners = bluetooth.async_current_scanners(hass)
assert len(scanners) == initial_scanner_count + 2
assert hci0_scanner in scanners
assert hci1_scanner in scanners
# Verify current_mode is accessible on all scanners
for scanner in scanners:
assert hasattr(scanner, "current_mode")
# Verify it's None or the correct type (BluetoothScanningMode)
assert scanner.current_mode is None or isinstance(
scanner.current_mode, BluetoothScanningMode
)
# Register non-connectable scanner
connector = HaBluetoothConnector(
MockBleakClient, "mock_bleak_client", lambda: False
)
hci2_scanner = FakeRemoteScanner("hci2", "hci2", connector, False)
cancel_hci2 = bluetooth.async_register_scanner(hass, hci2_scanner)
# Test that all scanners are returned (both connectable and non-connectable)
all_scanners = bluetooth.async_current_scanners(hass)
assert len(all_scanners) == initial_scanner_count + 3
assert hci0_scanner in all_scanners
assert hci1_scanner in all_scanners
assert hci2_scanner in all_scanners
# Verify current_mode is accessible on all scanners including non-connectable
for scanner in all_scanners:
assert hasattr(scanner, "current_mode")
# The mode should be None or a BluetoothScanningMode instance
assert scanner.current_mode is None or isinstance(
scanner.current_mode, BluetoothScanningMode
)
# Clean up our scanners
cancel_hci0()
cancel_hci1()
cancel_hci2()
# Verify we're back to the initial scanner
final_scanners = bluetooth.async_current_scanners(hass)
assert len(final_scanners) == initial_scanner_count

View File

@@ -30,19 +30,34 @@ async def mocked_cloud_object(hass: HomeAssistant) -> Cloud:
)
async def test_fetching_subscription_with_api_error(
aioclient_mock: AiohttpClientMocker,
caplog: pytest.LogCaptureFixture,
mocked_cloud: Cloud,
) -> None:
"""Test that we handle API errors."""
mocked_cloud.payments.subscription_info.side_effect = payments_api.PaymentsApiError(
"There was an error with the API"
)
assert await async_subscription_info(mocked_cloud) is None
assert (
"Failed to fetch subscription information - There was an error with the API"
in caplog.text
)
async def test_fetching_subscription_with_timeout_error(
aioclient_mock: AiohttpClientMocker,
caplog: pytest.LogCaptureFixture,
mocked_cloud: Cloud,
) -> None:
"""Test that we handle timeout error."""
mocked_cloud.payments.subscription_info.side_effect = payments_api.PaymentsApiError(
"Timeout reached while calling API"
)
mocked_cloud.payments.subscription_info.side_effect = TimeoutError()
assert await async_subscription_info(mocked_cloud) is None
assert (
"Failed to fetch subscription information - Timeout reached while calling API"
"A timeout of 10 was reached while trying to fetch subscription information"
in caplog.text
)

View File

@@ -1,13 +0,0 @@
"""Tests for the Droplet integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(hass: HomeAssistant, config_entry: MockConfigEntry) -> None:
"""Fixture for setting up the component."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -1,108 +0,0 @@
"""Common fixtures for the Droplet tests."""
from collections.abc import Generator
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.droplet.const import DOMAIN
from homeassistant.const import CONF_CODE, CONF_IP_ADDRESS, CONF_PORT
from tests.common import MockConfigEntry
MOCK_CODE = "11223"
MOCK_HOST = "192.168.1.2"
MOCK_PORT = 443
MOCK_DEVICE_ID = "Droplet-1234"
MOCK_MANUFACTURER = "Hydrific, part of LIXIL"
MOCK_SN = "1234"
MOCK_SW_VERSION = "v1.0.0"
MOCK_MODEL = "Droplet 1.0"
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
domain=DOMAIN,
data={CONF_IP_ADDRESS: MOCK_HOST, CONF_PORT: MOCK_PORT, CONF_CODE: MOCK_CODE},
unique_id=MOCK_DEVICE_ID,
)
@pytest.fixture
def mock_droplet() -> Generator[AsyncMock]:
"""Mock a Droplet client."""
with (
patch(
"homeassistant.components.droplet.coordinator.Droplet",
autospec=True,
) as mock_client,
):
client = mock_client.return_value
client.get_signal_quality.return_value = "strong_signal"
client.get_server_status.return_value = "connected"
client.get_flow_rate.return_value = 0.1
client.get_manufacturer.return_value = MOCK_MANUFACTURER
client.get_model.return_value = MOCK_MODEL
client.get_fw_version.return_value = MOCK_SW_VERSION
client.get_sn.return_value = MOCK_SN
client.get_volume_last_fetched.return_value = datetime(
year=2020, month=1, day=1
)
yield client
@pytest.fixture(autouse=True)
def mock_timeout() -> Generator[None]:
"""Mock the timeout."""
with (
patch(
"homeassistant.components.droplet.coordinator.TIMEOUT",
0.05,
),
patch(
"homeassistant.components.droplet.coordinator.VERSION_TIMEOUT",
0.1,
),
patch(
"homeassistant.components.droplet.coordinator.CONNECT_DELAY",
0.1,
),
):
yield
@pytest.fixture
def mock_droplet_connection() -> Generator[AsyncMock]:
"""Mock a Droplet connection."""
with (
patch(
"homeassistant.components.droplet.config_flow.DropletConnection",
autospec=True,
) as mock_client,
):
client = mock_client.return_value
yield client
@pytest.fixture
def mock_droplet_discovery(request: pytest.FixtureRequest) -> Generator[AsyncMock]:
"""Mock a DropletDiscovery."""
with (
patch(
"homeassistant.components.droplet.config_flow.DropletDiscovery",
autospec=True,
) as mock_client,
):
client = mock_client.return_value
# Not all tests set this value
try:
client.host = request.param
except AttributeError:
client.host = MOCK_HOST
client.port = MOCK_PORT
client.try_connect.return_value = True
client.get_device_id.return_value = MOCK_DEVICE_ID
yield client

View File

@@ -1,240 +0,0 @@
# serializer version: 1
# name: test_sensors[sensor.mock_title_flow_rate-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.mock_title_flow_rate',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
'sensor': dict({
'suggested_display_precision': 2,
}),
'sensor.private': dict({
'suggested_unit_of_measurement': <UnitOfVolumeFlowRate.GALLONS_PER_MINUTE: 'gal/min'>,
}),
}),
'original_device_class': <SensorDeviceClass.VOLUME_FLOW_RATE: 'volume_flow_rate'>,
'original_icon': None,
'original_name': 'Flow rate',
'platform': 'droplet',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'current_flow_rate',
'unique_id': 'Droplet-1234_current_flow_rate',
'unit_of_measurement': <UnitOfVolumeFlowRate.GALLONS_PER_MINUTE: 'gal/min'>,
})
# ---
# name: test_sensors[sensor.mock_title_flow_rate-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'volume_flow_rate',
'friendly_name': 'Mock Title Flow rate',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': <UnitOfVolumeFlowRate.GALLONS_PER_MINUTE: 'gal/min'>,
}),
'context': <ANY>,
'entity_id': 'sensor.mock_title_flow_rate',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '0.0264172052358148',
})
# ---
# name: test_sensors[sensor.mock_title_server_status-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'options': list([
'connected',
'connecting',
'disconnected',
]),
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.mock_title_server_status',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.ENUM: 'enum'>,
'original_icon': None,
'original_name': 'Server status',
'platform': 'droplet',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'server_connectivity',
'unique_id': 'Droplet-1234_server_connectivity',
'unit_of_measurement': None,
})
# ---
# name: test_sensors[sensor.mock_title_server_status-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'enum',
'friendly_name': 'Mock Title Server status',
'options': list([
'connected',
'connecting',
'disconnected',
]),
}),
'context': <ANY>,
'entity_id': 'sensor.mock_title_server_status',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'connected',
})
# ---
# name: test_sensors[sensor.mock_title_signal_quality-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'options': list([
'no_signal',
'weak_signal',
'strong_signal',
]),
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.mock_title_signal_quality',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
}),
'original_device_class': <SensorDeviceClass.ENUM: 'enum'>,
'original_icon': None,
'original_name': 'Signal quality',
'platform': 'droplet',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'signal_quality',
'unique_id': 'Droplet-1234_signal_quality',
'unit_of_measurement': None,
})
# ---
# name: test_sensors[sensor.mock_title_signal_quality-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'enum',
'friendly_name': 'Mock Title Signal quality',
'options': list([
'no_signal',
'weak_signal',
'strong_signal',
]),
}),
'context': <ANY>,
'entity_id': 'sensor.mock_title_signal_quality',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'strong_signal',
})
# ---
# name: test_sensors[sensor.mock_title_water-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.TOTAL: 'total'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.mock_title_water',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'options': dict({
'sensor': dict({
'suggested_display_precision': 2,
}),
'sensor.private': dict({
'suggested_unit_of_measurement': <UnitOfVolume.GALLONS: 'gal'>,
}),
}),
'original_device_class': <SensorDeviceClass.WATER: 'water'>,
'original_icon': None,
'original_name': 'Water',
'platform': 'droplet',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': 'Droplet-1234_volume',
'unit_of_measurement': <UnitOfVolume.GALLONS: 'gal'>,
})
# ---
# name: test_sensors[sensor.mock_title_water-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'water',
'friendly_name': 'Mock Title Water',
'last_reset': '2020-01-01T00:00:00',
'state_class': <SensorStateClass.TOTAL: 'total'>,
'unit_of_measurement': <UnitOfVolume.GALLONS: 'gal'>,
}),
'context': <ANY>,
'entity_id': 'sensor.mock_title_water',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '0.264172052358148',
})
# ---

View File

@@ -1,271 +0,0 @@
"""Test Droplet config flow."""
from ipaddress import IPv4Address
from unittest.mock import AsyncMock
import pytest
from homeassistant.components.droplet.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF
from homeassistant.const import (
ATTR_CODE,
CONF_CODE,
CONF_DEVICE_ID,
CONF_IP_ADDRESS,
CONF_PORT,
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .conftest import MOCK_CODE, MOCK_DEVICE_ID, MOCK_HOST, MOCK_PORT
from tests.common import MockConfigEntry
async def test_user_setup(
hass: HomeAssistant,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
) -> None:
"""Test successful Droplet user setup."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result is not None
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: MOCK_CODE, CONF_IP_ADDRESS: "192.168.1.2"},
)
assert result is not None
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result.get("data") == {
CONF_CODE: MOCK_CODE,
CONF_DEVICE_ID: MOCK_DEVICE_ID,
CONF_IP_ADDRESS: MOCK_HOST,
CONF_PORT: MOCK_PORT,
}
assert result.get("context") is not None
assert result.get("context", {}).get("unique_id") == MOCK_DEVICE_ID
@pytest.mark.parametrize(
("device_id", "connect_res"),
[
(
"",
True,
),
(MOCK_DEVICE_ID, False),
],
ids=["no_device_id", "cannot_connect"],
)
async def test_user_setup_fail(
hass: HomeAssistant,
device_id: str,
connect_res: bool,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
) -> None:
"""Test user setup failing due to no device ID or failed connection."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result is not None
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
attrs = {
"get_device_id.return_value": device_id,
"try_connect.return_value": connect_res,
}
mock_droplet_discovery.configure_mock(**attrs)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: MOCK_CODE, CONF_IP_ADDRESS: MOCK_HOST},
)
assert result is not None
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "cannot_connect"}
# The user should be able to try again. Maybe the droplet was disconnected from the network or something
attrs = {
"get_device_id.return_value": MOCK_DEVICE_ID,
"try_connect.return_value": True,
}
mock_droplet_discovery.configure_mock(**attrs)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: MOCK_CODE, CONF_IP_ADDRESS: MOCK_HOST},
)
assert result is not None
assert result.get("type") is FlowResultType.CREATE_ENTRY
async def test_user_setup_already_configured(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
mock_droplet: AsyncMock,
mock_droplet_connection: AsyncMock,
) -> None:
"""Test user setup of an already-configured device."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result is not None
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_CODE: MOCK_CODE, CONF_IP_ADDRESS: MOCK_HOST},
)
assert result is not None
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
async def test_zeroconf_setup(
hass: HomeAssistant,
mock_droplet_discovery: AsyncMock,
mock_droplet: AsyncMock,
mock_droplet_connection: AsyncMock,
) -> None:
"""Test successful setup of Droplet via zeroconf."""
discovery_info = ZeroconfServiceInfo(
ip_address=IPv4Address(MOCK_HOST),
ip_addresses=[IPv4Address(MOCK_HOST)],
port=MOCK_PORT,
hostname=MOCK_DEVICE_ID,
type="_droplet._tcp.local.",
name=MOCK_DEVICE_ID,
properties={},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=discovery_info,
)
assert result is not None
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_CODE: MOCK_CODE}
)
assert result is not None
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result.get("data") == {
CONF_DEVICE_ID: MOCK_DEVICE_ID,
CONF_IP_ADDRESS: MOCK_HOST,
CONF_PORT: MOCK_PORT,
CONF_CODE: MOCK_CODE,
}
assert result.get("context") is not None
assert result.get("context", {}).get("unique_id") == MOCK_DEVICE_ID
@pytest.mark.parametrize("mock_droplet_discovery", ["192.168.1.5"], indirect=True)
async def test_zeroconf_update(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
) -> None:
"""Test updating Droplet's host with zeroconf."""
mock_config_entry.add_to_hass(hass)
# We start with a different host
new_host = "192.168.1.5"
assert mock_config_entry.data[CONF_IP_ADDRESS] != new_host
# After this discovery message, host should be updated
discovery_info = ZeroconfServiceInfo(
ip_address=IPv4Address(new_host),
ip_addresses=[IPv4Address(new_host)],
port=MOCK_PORT,
hostname=MOCK_DEVICE_ID,
type="_droplet._tcp.local.",
name=MOCK_DEVICE_ID,
properties={},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=discovery_info,
)
assert result is not None
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
assert mock_config_entry.data[CONF_IP_ADDRESS] == new_host
async def test_zeroconf_invalid_discovery(hass: HomeAssistant) -> None:
"""Test that invalid discovery information causes the config flow to abort."""
discovery_info = ZeroconfServiceInfo(
ip_address=IPv4Address(MOCK_HOST),
ip_addresses=[IPv4Address(MOCK_HOST)],
port=-1,
hostname=MOCK_DEVICE_ID,
type="_droplet._tcp.local.",
name=MOCK_DEVICE_ID,
properties={},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=discovery_info,
)
assert result is not None
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "invalid_discovery_info"
async def test_confirm_cannot_connect(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet_discovery: AsyncMock,
) -> None:
"""Test that config flow fails when Droplet can't connect."""
discovery_info = ZeroconfServiceInfo(
ip_address=IPv4Address(MOCK_HOST),
ip_addresses=[IPv4Address(MOCK_HOST)],
port=MOCK_PORT,
hostname=MOCK_DEVICE_ID,
type="_droplet._tcp.local.",
name=MOCK_DEVICE_ID,
properties={},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=discovery_info,
)
assert result.get("type") is FlowResultType.FORM
# Mock the connection failing
mock_droplet_discovery.try_connect.return_value = False
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {ATTR_CODE: MOCK_CODE}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors")["base"] == "cannot_connect"
mock_droplet_discovery.try_connect.return_value = True
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={ATTR_CODE: MOCK_CODE}
)
assert result.get("type") is FlowResultType.CREATE_ENTRY, result

View File

@@ -1,41 +0,0 @@
"""Test Droplet initialization."""
from unittest.mock import AsyncMock
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_setup_no_version_info(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test coordinator setup where Droplet never sends version info."""
mock_droplet.version_info_available.return_value = False
await setup_integration(hass, mock_config_entry)
assert "Failed to get version info from Droplet" in caplog.text
async def test_setup_droplet_offline(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
) -> None:
"""Test integration setup when Droplet is offline."""
mock_droplet.connected = False
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY

View File

@@ -1,46 +0,0 @@
"""Test Droplet sensors."""
from unittest.mock import AsyncMock
from syrupy.assertion import SnapshotAssertion
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import setup_integration
from tests.common import MockConfigEntry, snapshot_platform
async def test_sensors(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
snapshot: SnapshotAssertion,
entity_registry: er.EntityRegistry,
) -> None:
"""Test Droplet sensors."""
await setup_integration(hass, mock_config_entry)
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
async def test_sensors_update_data(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_droplet_discovery: AsyncMock,
mock_droplet_connection: AsyncMock,
mock_droplet: AsyncMock,
) -> None:
"""Test Droplet async update data."""
await setup_integration(hass, mock_config_entry)
assert hass.states.get("sensor.mock_title_flow_rate").state == "0.0264172052358148"
mock_droplet.get_flow_rate.return_value = 0.5
mock_droplet.listen_forever.call_args_list[0][0][1]({})
assert hass.states.get("sensor.mock_title_flow_rate").state == "0.132086026179074"

View File

@@ -143,11 +143,7 @@ async def test_discovery_with_existing_device_present(hass: HomeAssistant) -> No
)
config_entry.add_to_hass(hass)
with (
_patch_device(),
_patch_discovery(),
_patch_config_flow_try_connect(no_device=True),
):
with _patch_discovery(), _patch_config_flow_try_connect(no_device=True):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -2142,12 +2142,7 @@ async def test_light_strip_zones_not_populated_yet(hass: HomeAssistant) -> None:
assert bulb.set_power.calls[0][0][0] is True
bulb.set_power.reset_mock()
with (
_patch_discovery(device=bulb),
_patch_config_flow_try_connect(device=bulb),
_patch_device(device=bulb),
):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=30))
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.state == STATE_ON

View File

@@ -57,30 +57,32 @@ async def test_rssi_sensor(
await hass.async_block_till_done()
entity_id = "sensor.my_bulb_rssi"
assert not hass.states.get(entity_id)
entry = entity_registry.entities.get(entity_id)
assert entry
assert entry.disabled
assert entry.disabled_by is er.RegistryEntryDisabler.INTEGRATION
# Test enabling entity, this will trigger a reload of the config entry
# Test enabling entity
updated_entry = entity_registry.async_update_entity(
entry.entity_id, disabled_by=None
)
assert updated_entry != entry
assert updated_entry.disabled is False
assert updated_entry.unit_of_measurement == SIGNAL_STRENGTH_DECIBELS_MILLIWATT
with (
_patch_discovery(device=bulb),
_patch_config_flow_try_connect(device=bulb),
_patch_device(device=bulb),
):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=120))
await hass.config_entries.async_reload(config_entry.entry_id)
await hass.async_block_till_done()
assert updated_entry != entry
assert updated_entry.disabled is False
assert updated_entry.unit_of_measurement == SIGNAL_STRENGTH_DECIBELS_MILLIWATT
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=120))
await hass.async_block_till_done()
rssi = hass.states.get(entity_id)
assert (
rssi.attributes[ATTR_UNIT_OF_MEASUREMENT] == SIGNAL_STRENGTH_DECIBELS_MILLIWATT
@@ -118,23 +120,26 @@ async def test_rssi_sensor_old_firmware(
assert entry.disabled
assert entry.disabled_by is er.RegistryEntryDisabler.INTEGRATION
# Test enabling entity, this will trigger a reload of the config entry
# Test enabling entity
updated_entry = entity_registry.async_update_entity(
entry.entity_id, disabled_by=None
)
assert updated_entry != entry
assert updated_entry.disabled is False
assert updated_entry.unit_of_measurement == SIGNAL_STRENGTH_DECIBELS
with (
_patch_discovery(device=bulb),
_patch_config_flow_try_connect(device=bulb),
_patch_device(device=bulb),
):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=120))
await hass.config_entries.async_reload(config_entry.entry_id)
await hass.async_block_till_done()
assert updated_entry != entry
assert updated_entry.disabled is False
assert updated_entry.unit_of_measurement == SIGNAL_STRENGTH_DECIBELS
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=120))
await hass.async_block_till_done()
rssi = hass.states.get(entity_id)
assert rssi.attributes[ATTR_UNIT_OF_MEASUREMENT] == SIGNAL_STRENGTH_DECIBELS
assert rssi.attributes[ATTR_DEVICE_CLASS] == SensorDeviceClass.SIGNAL_STRENGTH

View File

@@ -353,14 +353,14 @@
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_device_class': <SensorDeviceClass.NITROGEN_DIOXIDE: 'nitrogen_dioxide'>,
'original_icon': None,
'original_name': 'Nitrogen dioxide',
'platform': 'matter',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'nitrogen_dioxide',
'translation_key': None,
'unique_id': '00000000000004D2-000000000000008F-MatterNodeDevice-2-NitrogenDioxideSensor-1043-0',
'unit_of_measurement': 'ppm',
})
@@ -368,6 +368,7 @@
# name: test_sensors[air_purifier][sensor.air_purifier_nitrogen_dioxide-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'nitrogen_dioxide',
'friendly_name': 'Air Purifier Nitrogen dioxide',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': 'ppm',
@@ -954,14 +955,14 @@
'name': None,
'options': dict({
}),
'original_device_class': None,
'original_device_class': <SensorDeviceClass.NITROGEN_DIOXIDE: 'nitrogen_dioxide'>,
'original_icon': None,
'original_name': 'Nitrogen dioxide',
'platform': 'matter',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'nitrogen_dioxide',
'translation_key': None,
'unique_id': '00000000000004D2-0000000000000001-MatterNodeDevice-1-NitrogenDioxideSensor-1043-0',
'unit_of_measurement': 'ppm',
})
@@ -969,6 +970,7 @@
# name: test_sensors[air_quality_sensor][sensor.lightfi_aq1_air_quality_sensor_nitrogen_dioxide-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'nitrogen_dioxide',
'friendly_name': 'lightfi-aq1-air-quality-sensor Nitrogen dioxide',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': 'ppm',

View File

@@ -251,19 +251,12 @@ async def test_async_step_bluetooth_not_connectable(hass: HomeAssistant) -> None
async def test_user_setup_wohand(hass: HomeAssistant) -> None:
"""Test the user initiated form with password and valid mac."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOHAND_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
@@ -299,20 +292,12 @@ async def test_user_setup_wohand_already_configured(hass: HomeAssistant) -> None
unique_id="aabbccddeeff",
)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOHAND_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "no_devices_found"
@@ -324,20 +309,12 @@ async def test_user_setup_wohand_replaces_ignored(hass: HomeAssistant) -> None:
domain=DOMAIN, data={}, unique_id="aabbccddeeff", source=SOURCE_IGNORE
)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOHAND_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
@@ -362,19 +339,12 @@ async def test_user_setup_wohand_replaces_ignored(hass: HomeAssistant) -> None:
async def test_user_setup_wocurtain(hass: HomeAssistant) -> None:
"""Test the user initiated form with password and valid mac."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOCURTAIN_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
@@ -400,12 +370,6 @@ async def test_user_setup_wocurtain(hass: HomeAssistant) -> None:
async def test_user_setup_wocurtain_or_bot(hass: HomeAssistant) -> None:
"""Test the user initiated form with valid address."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[
@@ -415,12 +379,11 @@ async def test_user_setup_wocurtain_or_bot(hass: HomeAssistant) -> None:
WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_device"
assert result["step_id"] == "user"
assert result["errors"] == {}
with patch_async_setup_entry() as mock_setup_entry:
@@ -443,12 +406,6 @@ async def test_user_setup_wocurtain_or_bot(hass: HomeAssistant) -> None:
async def test_user_setup_wocurtain_or_bot_with_password(hass: HomeAssistant) -> None:
"""Test the user initiated form and valid address and a bot with a password."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[
@@ -457,12 +414,11 @@ async def test_user_setup_wocurtain_or_bot_with_password(hass: HomeAssistant) ->
WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_device"
assert result["step_id"] == "user"
assert result["errors"] == {}
result2 = await hass.config_entries.flow.async_configure(
@@ -494,19 +450,12 @@ async def test_user_setup_wocurtain_or_bot_with_password(hass: HomeAssistant) ->
async def test_user_setup_single_bot_with_password(hass: HomeAssistant) -> None:
"""Test the user initiated form for a bot with a password."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOHAND_ENCRYPTED_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "password"
@@ -533,19 +482,12 @@ async def test_user_setup_single_bot_with_password(hass: HomeAssistant) -> None:
async def test_user_setup_woencrypted_key(hass: HomeAssistant) -> None:
"""Test the user initiated form for a lock."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOLOCK_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
@@ -606,19 +548,12 @@ async def test_user_setup_woencrypted_key(hass: HomeAssistant) -> None:
async def test_user_setup_woencrypted_auth(hass: HomeAssistant) -> None:
"""Test the user initiated form for a lock."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOLOCK_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
@@ -688,19 +623,12 @@ async def test_user_setup_woencrypted_auth_switchbot_api_down(
) -> None:
"""Test the user initiated form for a lock when the switchbot api is down."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOLOCK_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
@@ -733,12 +661,6 @@ async def test_user_setup_woencrypted_auth_switchbot_api_down(
async def test_user_setup_wolock_or_bot(hass: HomeAssistant) -> None:
"""Test the user initiated form for a lock."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[
@@ -746,12 +668,11 @@ async def test_user_setup_wolock_or_bot(hass: HomeAssistant) -> None:
WOHAND_SERVICE_ALT_ADDRESS_INFO,
],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_device"
assert result["step_id"] == "user"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
@@ -800,19 +721,12 @@ async def test_user_setup_wolock_or_bot(hass: HomeAssistant) -> None:
async def test_user_setup_wosensor(hass: HomeAssistant) -> None:
"""Test the user initiated form with password and valid mac."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOSENSORTH_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
@@ -835,225 +749,14 @@ async def test_user_setup_wosensor(hass: HomeAssistant) -> None:
assert len(mock_setup_entry.mock_calls) == 1
async def test_user_cloud_login(hass: HomeAssistant) -> None:
"""Test the cloud login flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "cloud_login"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud_login"
# Test successful cloud login
with (
patch(
"homeassistant.components.switchbot.config_flow.fetch_cloud_devices",
return_value=None,
),
patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOHAND_SERVICE_INFO],
),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test@example.com",
CONF_PASSWORD: "testpass",
},
)
# Should proceed to device selection with single device, so go to confirm
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
# Confirm device setup
with patch_async_setup_entry():
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Bot EEFF"
assert result["data"] == {
CONF_ADDRESS: "AA:BB:CC:DD:EE:FF",
CONF_SENSOR_TYPE: "bot",
}
async def test_user_cloud_login_auth_failed(hass: HomeAssistant) -> None:
"""Test the cloud login flow with authentication failure."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "cloud_login"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud_login"
# Test authentication failure
with patch(
"homeassistant.components.switchbot.config_flow.fetch_cloud_devices",
side_effect=SwitchbotAuthenticationError("Invalid credentials"),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test@example.com",
CONF_PASSWORD: "wrongpass",
},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud_login"
assert result["errors"] == {"base": "auth_failed"}
assert "Invalid credentials" in result["description_placeholders"]["error_detail"]
async def test_user_cloud_login_api_error(hass: HomeAssistant) -> None:
"""Test the cloud login flow with API error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "cloud_login"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud_login"
# Test API connection error
with patch(
"homeassistant.components.switchbot.config_flow.fetch_cloud_devices",
side_effect=SwitchbotAccountConnectionError("API is down"),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test@example.com",
CONF_PASSWORD: "testpass",
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "api_error"
assert result["description_placeholders"] == {"error_detail": "API is down"}
async def test_user_cloud_login_then_encrypted_device(hass: HomeAssistant) -> None:
"""Test cloud login followed by encrypted device setup using saved credentials."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "cloud_login"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud_login"
with (
patch(
"homeassistant.components.switchbot.config_flow.fetch_cloud_devices",
return_value=None,
),
patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOLOCK_SERVICE_INFO],
),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test@example.com",
CONF_PASSWORD: "testpass",
},
)
# Should go to encrypted device choice menu
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
# Choose encrypted auth
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={"next_step_id": "encrypted_auth"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "encrypted_auth"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
None,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "encrypted_auth"
with (
patch_async_setup_entry() as mock_setup_entry,
patch(
"switchbot.SwitchbotLock.async_retrieve_encryption_key",
return_value={
CONF_KEY_ID: "ff",
CONF_ENCRYPTION_KEY: "ffffffffffffffffffffffffffffffff",
},
),
patch("switchbot.SwitchbotLock.verify_encryption_key", return_value=True),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test@example.com",
CONF_PASSWORD: "testpass",
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Lock EEFF"
assert result["data"] == {
CONF_ADDRESS: "aa:bb:cc:dd:ee:ff",
CONF_KEY_ID: "ff",
CONF_ENCRYPTION_KEY: "ffffffffffffffffffffffffffffffff",
CONF_SENSOR_TYPE: "lock",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_user_no_devices(hass: HomeAssistant) -> None:
"""Test the user initiated form with password and valid mac."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "no_devices_found"
@@ -1071,20 +774,13 @@ async def test_async_step_user_takes_precedence_over_discovery(
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOCURTAIN_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
@@ -1235,19 +931,12 @@ async def test_options_flow_lock_pro(hass: HomeAssistant) -> None:
async def test_user_setup_worelay_switch_1pm_key(hass: HomeAssistant) -> None:
"""Test the user initiated form for a relay switch 1pm."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WORELAY_SWITCH_1PM_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
@@ -1290,19 +979,12 @@ async def test_user_setup_worelay_switch_1pm_key(hass: HomeAssistant) -> None:
async def test_user_setup_worelay_switch_1pm_auth(hass: HomeAssistant) -> None:
"""Test the user initiated form for a relay switch 1pm."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WORELAY_SWITCH_1PM_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"
@@ -1371,19 +1053,12 @@ async def test_user_setup_worelay_switch_1pm_auth_switchbot_api_down(
) -> None:
"""Test the user initiated form for a relay switch 1pm when the switchbot api is down."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "user"
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WORELAY_SWITCH_1PM_SERVICE_INFO],
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"next_step_id": "select_device"},
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "encrypted_choose_method"

View File

@@ -3052,7 +3052,7 @@
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'translation_key': 'wind_speed',
'unique_id': 'tuya.6tbtkuv3tal1aesfjxqwindspeed_avg',
'unit_of_measurement': <UnitOfSpeed.KILOMETERS_PER_HOUR: 'km/h'>,
})
@@ -6731,7 +6731,7 @@
'state': '42.0',
})
# ---
# name: test_platform_setup_and_discovery[sensor.house_water_level_depth-entry]
# name: test_platform_setup_and_discovery[sensor.house_water_level_distance-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
@@ -6746,7 +6746,7 @@
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.house_water_level_depth',
'entity_id': 'sensor.house_water_level_distance',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
@@ -6761,7 +6761,7 @@
}),
'original_device_class': <SensorDeviceClass.DISTANCE: 'distance'>,
'original_icon': None,
'original_name': 'Depth',
'original_name': 'Distance',
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
@@ -6771,16 +6771,16 @@
'unit_of_measurement': 'm',
})
# ---
# name: test_platform_setup_and_discovery[sensor.house_water_level_depth-state]
# name: test_platform_setup_and_discovery[sensor.house_water_level_distance-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'distance',
'friendly_name': 'House Water Level Depth',
'friendly_name': 'House Water Level Distance',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': 'm',
}),
'context': <ANY>,
'entity_id': 'sensor.house_water_level_depth',
'entity_id': 'sensor.house_water_level_distance',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
@@ -13042,7 +13042,7 @@
'state': '25.0',
})
# ---
# name: test_platform_setup_and_discovery[sensor.rainwater_tank_level_depth-entry]
# name: test_platform_setup_and_discovery[sensor.rainwater_tank_level_distance-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
@@ -13057,7 +13057,7 @@
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.rainwater_tank_level_depth',
'entity_id': 'sensor.rainwater_tank_level_distance',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
@@ -13072,7 +13072,7 @@
}),
'original_device_class': <SensorDeviceClass.DISTANCE: 'distance'>,
'original_icon': None,
'original_name': 'Depth',
'original_name': 'Distance',
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
@@ -13082,16 +13082,16 @@
'unit_of_measurement': 'm',
})
# ---
# name: test_platform_setup_and_discovery[sensor.rainwater_tank_level_depth-state]
# name: test_platform_setup_and_discovery[sensor.rainwater_tank_level_distance-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'distance',
'friendly_name': 'Rainwater Tank Level Depth',
'friendly_name': 'Rainwater Tank Level Distance',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': 'm',
}),
'context': <ANY>,
'entity_id': 'sensor.rainwater_tank_level_depth',
'entity_id': 'sensor.rainwater_tank_level_distance',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
@@ -16374,7 +16374,7 @@
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'translation_key': 'wind_speed',
'unique_id': 'tuya.ai9swgb6tyinbwbxjxqwindspeed_avg',
'unit_of_measurement': 'km/h',
})

View File

@@ -6,17 +6,17 @@
'area_id': None,
'capabilities': dict({
'fan_modes': list([
'off',
'auto',
'low',
'medium',
'high',
'medium',
'low',
'off',
]),
'hvac_modes': list([
<HVACMode.OFF: 'off'>,
<HVACMode.COOL: 'cool'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.FAN_ONLY: 'fan_only'>,
<HVACMode.OFF: 'off'>,
]),
'max_temp': 30,
'min_temp': 16,
@@ -62,18 +62,18 @@
'current_temperature': 15,
'fan_mode': 'auto',
'fan_modes': list([
'off',
'auto',
'low',
'medium',
'high',
'medium',
'low',
'off',
]),
'friendly_name': 'Aircon said1',
'hvac_modes': list([
<HVACMode.OFF: 'off'>,
<HVACMode.COOL: 'cool'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.FAN_ONLY: 'fan_only'>,
<HVACMode.OFF: 'off'>,
]),
'max_temp': 30,
'min_temp': 16,
@@ -101,17 +101,17 @@
'area_id': None,
'capabilities': dict({
'fan_modes': list([
'off',
'auto',
'low',
'medium',
'high',
'medium',
'low',
'off',
]),
'hvac_modes': list([
<HVACMode.OFF: 'off'>,
<HVACMode.COOL: 'cool'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.FAN_ONLY: 'fan_only'>,
<HVACMode.OFF: 'off'>,
]),
'max_temp': 30,
'min_temp': 16,
@@ -157,18 +157,18 @@
'current_temperature': 15,
'fan_mode': 'auto',
'fan_modes': list([
'off',
'auto',
'low',
'medium',
'high',
'medium',
'low',
'off',
]),
'friendly_name': 'Aircon said2',
'hvac_modes': list([
<HVACMode.OFF: 'off'>,
<HVACMode.COOL: 'cool'>,
<HVACMode.HEAT: 'heat'>,
<HVACMode.FAN_ONLY: 'fan_only'>,
<HVACMode.OFF: 'off'>,
]),
'max_temp': 30,
'min_temp': 16,

View File

@@ -5283,7 +5283,7 @@ async def test_async_get_or_create_thread_safety(
with pytest.raises(
RuntimeError,
match="Detected code that calls device_registry._async_update_device from a thread.",
match="Detected code that calls device_registry.async_update_device from a thread.",
):
await hass.async_add_executor_job(
partial(

View File

@@ -5,7 +5,6 @@ from collections.abc import Iterable
from copy import deepcopy
import dataclasses
import io
import logging
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
@@ -37,7 +36,6 @@ from homeassistant.core import (
ServiceCall,
ServiceResponse,
SupportsResponse,
callback,
)
from homeassistant.helpers import (
area_registry as ar,
@@ -57,7 +55,6 @@ from homeassistant.util.yaml.loader import parse_yaml
from tests.common import (
MockEntity,
MockEntityPlatform,
MockModule,
MockUser,
RegistryEntryWithDefaults,
@@ -2464,330 +2461,3 @@ async def test_deprecated_async_extract_referenced_entity_ids(
assert args[0][2] is False
assert dataclasses.asdict(result) == dataclasses.asdict(mock_selected)
async def test_register_platform_entity_service(
hass: HomeAssistant,
) -> None:
"""Test registering a platform entity service."""
entities = []
@callback
def handle_service(entity, *_):
entities.append(entity)
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="mock_integration",
schema={},
func=handle_service,
)
await hass.services.async_call(
"mock_platform", "hello", {"entity_id": "all"}, blocking=True
)
assert entities == []
entity_platform = MockEntityPlatform(
hass, domain="mock_integration", platform_name="mock_platform", platform=None
)
entity1 = MockEntity(entity_id="mock_integration.entity1")
entity2 = MockEntity(entity_id="mock_integration.entity2")
await entity_platform.async_add_entities([entity1, entity2])
await hass.services.async_call(
"mock_platform", "hello", {"entity_id": "all"}, blocking=True
)
assert entities == unordered([entity1, entity2])
async def test_register_platform_entity_service_response_data(
hass: HomeAssistant,
) -> None:
"""Test an entity service that supports response data."""
async def generate_response(
target: MockEntity, call: ServiceCall
) -> ServiceResponse:
assert call.return_response
return {"response-key": "response-value"}
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="mock_integration",
schema={"some": str},
func=generate_response,
supports_response=SupportsResponse.ONLY,
)
entity_platform = MockEntityPlatform(
hass, domain="mock_integration", platform_name="mock_platform", platform=None
)
entity = MockEntity(entity_id="mock_integration.entity")
await entity_platform.async_add_entities([entity])
response_data = await hass.services.async_call(
"mock_platform",
"hello",
service_data={"some": "data"},
target={"entity_id": [entity.entity_id]},
blocking=True,
return_response=True,
)
assert response_data == {
"mock_integration.entity": {"response-key": "response-value"}
}
async def test_register_platform_entity_service_response_data_multiple_matches(
hass: HomeAssistant,
) -> None:
"""Test an entity service with response data and matching many entities."""
async def generate_response(
target: MockEntity, call: ServiceCall
) -> ServiceResponse:
assert call.return_response
return {"response-key": f"response-value-{target.entity_id}"}
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="mock_integration",
schema={"some": str},
func=generate_response,
supports_response=SupportsResponse.ONLY,
)
entity_platform = MockEntityPlatform(
hass, domain="mock_integration", platform_name="mock_platform", platform=None
)
entity1 = MockEntity(entity_id="mock_integration.entity1")
entity2 = MockEntity(entity_id="mock_integration.entity2")
await entity_platform.async_add_entities([entity1, entity2])
response_data = await hass.services.async_call(
"mock_platform",
"hello",
service_data={"some": "data"},
target={"entity_id": [entity1.entity_id, entity2.entity_id]},
blocking=True,
return_response=True,
)
assert response_data == {
"mock_integration.entity1": {
"response-key": "response-value-mock_integration.entity1"
},
"mock_integration.entity2": {
"response-key": "response-value-mock_integration.entity2"
},
}
async def test_register_platform_entity_service_response_data_multiple_matches_raises(
hass: HomeAssistant,
) -> None:
"""Test entity service response matching many entities raises."""
async def generate_response(
target: MockEntity, call: ServiceCall
) -> ServiceResponse:
assert call.return_response
if target.entity_id == "mock_integration.entity1":
raise RuntimeError("Something went wrong")
return {"response-key": f"response-value-{target.entity_id}"}
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="mock_integration",
schema={"some": str},
func=generate_response,
supports_response=SupportsResponse.ONLY,
)
entity_platform = MockEntityPlatform(
hass, domain="mock_integration", platform_name="mock_platform", platform=None
)
entity1 = MockEntity(entity_id="mock_integration.entity1")
entity2 = MockEntity(entity_id="mock_integration.entity2")
await entity_platform.async_add_entities([entity1, entity2])
with pytest.raises(RuntimeError, match="Something went wrong"):
await hass.services.async_call(
"mock_platform",
"hello",
service_data={"some": "data"},
target={"entity_id": [entity1.entity_id, entity2.entity_id]},
blocking=True,
return_response=True,
)
async def test_register_platform_entity_service_limited_to_matching_platforms(
hass: HomeAssistant,
entity_registry: er.EntityRegistry,
area_registry: ar.AreaRegistry,
) -> None:
"""Test entity services only target entities for the platform and domain."""
mock_area = area_registry.async_get_or_create("mock_area")
entity1_entry = entity_registry.async_get_or_create(
"base_platform", "mock_platform", "1234", suggested_object_id="entity1"
)
entity_registry.async_update_entity(entity1_entry.entity_id, area_id=mock_area.id)
entity2_entry = entity_registry.async_get_or_create(
"base_platform", "mock_platform", "5678", suggested_object_id="entity2"
)
entity_registry.async_update_entity(entity2_entry.entity_id, area_id=mock_area.id)
entity3_entry = entity_registry.async_get_or_create(
"base_platform", "other_mock_platform", "7891", suggested_object_id="entity3"
)
entity_registry.async_update_entity(entity3_entry.entity_id, area_id=mock_area.id)
entity4_entry = entity_registry.async_get_or_create(
"base_platform", "other_mock_platform", "1433", suggested_object_id="entity4"
)
entity_registry.async_update_entity(entity4_entry.entity_id, area_id=mock_area.id)
async def generate_response(
target: MockEntity, call: ServiceCall
) -> ServiceResponse:
assert call.return_response
return {"response-key": f"response-value-{target.entity_id}"}
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="base_platform",
schema={"some": str},
func=generate_response,
supports_response=SupportsResponse.ONLY,
)
entity_platform = MockEntityPlatform(
hass, domain="base_platform", platform_name="mock_platform", platform=None
)
entity1 = MockEntity(
entity_id=entity1_entry.entity_id, unique_id=entity1_entry.unique_id
)
entity2 = MockEntity(
entity_id=entity2_entry.entity_id, unique_id=entity2_entry.unique_id
)
await entity_platform.async_add_entities([entity1, entity2])
other_entity_platform = MockEntityPlatform(
hass, domain="base_platform", platform_name="other_mock_platform", platform=None
)
entity3 = MockEntity(
entity_id=entity3_entry.entity_id, unique_id=entity3_entry.unique_id
)
entity4 = MockEntity(
entity_id=entity4_entry.entity_id, unique_id=entity4_entry.unique_id
)
await other_entity_platform.async_add_entities([entity3, entity4])
response_data = await hass.services.async_call(
"mock_platform",
"hello",
service_data={"some": "data"},
target={"area_id": [mock_area.id]},
blocking=True,
return_response=True,
)
# We should not target entity3 and entity4 even though they are in the area
# because they are only part of the domain and not the platform
assert response_data == {
"base_platform.entity1": {
"response-key": "response-value-base_platform.entity1"
},
"base_platform.entity2": {
"response-key": "response-value-base_platform.entity2"
},
}
async def test_register_platform_entity_service_none_schema(
hass: HomeAssistant,
) -> None:
"""Test registering a service with schema set to None."""
entities = []
@callback
def handle_service(entity, *_):
entities.append(entity)
service.async_register_platform_entity_service(
hass,
"mock_platform",
"hello",
entity_domain="mock_integration",
schema=None,
func=handle_service,
)
entity_platform = MockEntityPlatform(
hass, domain="mock_integration", platform_name="mock_platform", platform=None
)
entity1 = MockEntity(name="entity_1")
entity2 = MockEntity(name="entity_1")
await entity_platform.async_add_entities([entity1, entity2])
await hass.services.async_call(
"mock_platform", "hello", {"entity_id": "all"}, blocking=True
)
assert len(entities) == 2
assert entity1 in entities
assert entity2 in entities
async def test_register_platform_entity_service_non_entity_service_schema(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test attempting to register a service with a non entity service schema."""
expected_message = "registers an entity service with a non entity service schema"
for idx, schema in enumerate(
(
vol.Schema({"some": str}),
vol.All(vol.Schema({"some": str})),
vol.Any(vol.Schema({"some": str})),
)
):
service.async_register_platform_entity_service(
hass,
"mock_platform",
f"hello_{idx}",
entity_domain="mock_integration",
schema=schema,
func=Mock(),
)
assert expected_message in caplog.text
caplog.clear()
for idx, schema in enumerate(
(
cv.make_entity_service_schema({"some": str}),
vol.Schema(cv.make_entity_service_schema({"some": str})),
vol.All(cv.make_entity_service_schema({"some": str})),
)
):
service.async_register_platform_entity_service(
hass,
"mock_platform",
f"test_service_{idx}",
entity_domain="mock_integration",
schema=schema,
func=Mock(),
)
assert expected_message not in caplog.text
assert not any(x.levelno > logging.DEBUG for x in caplog.records)