516 lines
18 KiB
Python

"""Support for SmartThings Cloud."""
from __future__ import annotations
from collections.abc import Callable
import contextlib
from dataclasses import dataclass
from http import HTTPStatus
import logging
from typing import TYPE_CHECKING, Any, cast
from aiohttp import ClientResponseError
from pysmartthings import (
Attribute,
Capability,
ComponentStatus,
Device,
DeviceEvent,
Lifecycle,
Scene,
SmartThings,
SmartThingsAuthenticationFailedError,
SmartThingsConnectionError,
SmartThingsSinkError,
Status,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_CONNECTIONS,
ATTR_HW_VERSION,
ATTR_MANUFACTURER,
ATTR_MODEL,
ATTR_SW_VERSION,
ATTR_VIA_DEVICE,
CONF_ACCESS_TOKEN,
CONF_TOKEN,
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.config_entry_oauth2_flow import (
OAuth2Session,
async_get_config_entry_implementation,
)
from homeassistant.helpers.entity_registry import RegistryEntry, async_migrate_entries
from .const import (
BINARY_SENSOR_ATTRIBUTES_TO_CAPABILITIES,
CONF_INSTALLED_APP_ID,
CONF_LOCATION_ID,
CONF_SUBSCRIPTION_ID,
DOMAIN,
EVENT_BUTTON,
MAIN,
OLD_DATA,
SENSOR_ATTRIBUTES_TO_CAPABILITIES,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class SmartThingsData:
"""Define an object to hold SmartThings data."""
devices: dict[str, FullDevice]
scenes: dict[str, Scene]
rooms: dict[str, str]
client: SmartThings
@dataclass
class FullDevice:
"""Define an object to hold device data."""
device: Device
status: dict[str, ComponentStatus]
type SmartThingsConfigEntry = ConfigEntry[SmartThingsData]
PLATFORMS = [
Platform.BINARY_SENSOR,
Platform.BUTTON,
Platform.CLIMATE,
Platform.COVER,
Platform.EVENT,
Platform.FAN,
Platform.LIGHT,
Platform.LOCK,
Platform.MEDIA_PLAYER,
Platform.NUMBER,
Platform.SCENE,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
Platform.UPDATE,
Platform.VALVE,
]
async def async_setup_entry(hass: HomeAssistant, entry: SmartThingsConfigEntry) -> bool:
"""Initialize config entry which represents an installed SmartApp."""
# The oauth smartthings entry will have a token, older ones are version 3
# after migration but still require reauthentication
if CONF_TOKEN not in entry.data:
raise ConfigEntryAuthFailed("Config entry missing token")
implementation = await async_get_config_entry_implementation(hass, entry)
session = OAuth2Session(hass, entry, implementation)
try:
await session.async_ensure_token_valid()
except ClientResponseError as err:
if err.status == HTTPStatus.BAD_REQUEST:
raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from err
raise ConfigEntryNotReady from err
client = SmartThings(session=async_get_clientsession(hass))
async def _refresh_token() -> str:
await session.async_ensure_token_valid()
token = session.token[CONF_ACCESS_TOKEN]
if TYPE_CHECKING:
assert isinstance(token, str)
return token
client.refresh_token_function = _refresh_token
def _handle_max_connections() -> None:
_LOGGER.debug(
"We hit the limit of max connections or we could not remove the old one, so retrying"
)
hass.config_entries.async_schedule_reload(entry.entry_id)
client.max_connections_reached_callback = _handle_max_connections
def _handle_new_subscription_identifier(identifier: str | None) -> None:
"""Handle a new subscription identifier."""
hass.config_entries.async_update_entry(
entry,
data={
**entry.data,
CONF_SUBSCRIPTION_ID: identifier,
},
)
if identifier is not None:
_LOGGER.debug("Updating subscription ID to %s", identifier)
else:
_LOGGER.debug("Removing subscription ID")
client.new_subscription_id_callback = _handle_new_subscription_identifier
if (old_identifier := entry.data.get(CONF_SUBSCRIPTION_ID)) is not None:
_LOGGER.debug("Trying to delete old subscription %s", old_identifier)
try:
await client.delete_subscription(old_identifier)
except SmartThingsConnectionError as err:
raise ConfigEntryNotReady("Could not delete old subscription") from err
_LOGGER.debug("Trying to create a new subscription")
try:
subscription = await client.create_subscription(
entry.data[CONF_LOCATION_ID],
entry.data[CONF_TOKEN][CONF_INSTALLED_APP_ID],
)
except SmartThingsSinkError as err:
_LOGGER.exception("Couldn't create a new subscription")
raise ConfigEntryNotReady from err
subscription_id = subscription.subscription_id
_handle_new_subscription_identifier(subscription_id)
entry.async_create_background_task(
hass,
client.subscribe(
entry.data[CONF_LOCATION_ID],
entry.data[CONF_TOKEN][CONF_INSTALLED_APP_ID],
subscription,
),
"smartthings_socket",
)
device_status: dict[str, FullDevice] = {}
try:
rooms = {
room.room_id: room.name
for room in await client.get_rooms(location_id=entry.data[CONF_LOCATION_ID])
}
devices = await client.get_devices()
for device in devices:
status = process_status(await client.get_device_status(device.device_id))
device_status[device.device_id] = FullDevice(device=device, status=status)
except SmartThingsAuthenticationFailedError as err:
raise ConfigEntryAuthFailed from err
device_registry = dr.async_get(hass)
create_devices(device_registry, device_status, entry, rooms)
scenes = {
scene.scene_id: scene
for scene in await client.get_scenes(location_id=entry.data[CONF_LOCATION_ID])
}
def handle_deleted_device(device_id: str) -> None:
"""Handle a deleted device."""
dev_entry = device_registry.async_get_device(
identifiers={(DOMAIN, device_id)},
)
if dev_entry is not None:
device_registry.async_update_device(
dev_entry.id, remove_config_entry_id=entry.entry_id
)
entry.async_on_unload(
client.add_device_lifecycle_event_listener(
Lifecycle.DELETE, handle_deleted_device
)
)
entry.runtime_data = SmartThingsData(
devices={
device_id: device
for device_id, device in device_status.items()
if MAIN in device.status
},
client=client,
scenes=scenes,
rooms=rooms,
)
# Events are deprecated and will be removed in 2025.10
def handle_button_press(event: DeviceEvent) -> None:
"""Handle a button press."""
if (
event.capability is Capability.BUTTON
and event.attribute is Attribute.BUTTON
):
hass.bus.async_fire(
EVENT_BUTTON,
{
"component_id": event.component_id,
"device_id": event.device_id,
"location_id": event.location_id,
"value": event.value,
"name": entry.runtime_data.devices[event.device_id].device.label,
"data": event.data,
},
)
entry.async_on_unload(
client.add_unspecified_device_event_listener(handle_button_press)
)
async def _handle_shutdown(_: Event) -> None:
"""Handle shutdown."""
await client.delete_subscription(subscription_id)
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _handle_shutdown)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
device_entries = dr.async_entries_for_config_entry(device_registry, entry.entry_id)
for device_entry in device_entries:
device_id = next(
identifier[1]
for identifier in device_entry.identifiers
if identifier[0] == DOMAIN
)
if device_id in device_status:
continue
device_registry.async_update_device(
device_entry.id, remove_config_entry_id=entry.entry_id
)
return True
async def async_unload_entry(
hass: HomeAssistant, entry: SmartThingsConfigEntry
) -> bool:
"""Unload a config entry."""
client = entry.runtime_data.client
if (subscription_id := entry.data.get(CONF_SUBSCRIPTION_ID)) is not None:
with contextlib.suppress(SmartThingsConnectionError):
await client.delete_subscription(subscription_id)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Handle config entry migration."""
if entry.version < 3:
# We keep the old data around, so we can use that to clean up the webhook in the future
hass.config_entries.async_update_entry(
entry, version=3, data={OLD_DATA: dict(entry.data)}
)
if entry.minor_version < 2:
def migrate_entities(entity_entry: RegistryEntry) -> dict[str, Any] | None:
if entity_entry.domain == "binary_sensor":
device_id, attribute = entity_entry.unique_id.split(".")
if (
capability := BINARY_SENSOR_ATTRIBUTES_TO_CAPABILITIES.get(
attribute
)
) is None:
return None
new_unique_id = (
f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}"
)
return {
"new_unique_id": new_unique_id,
}
if entity_entry.domain in {"cover", "climate", "fan", "light", "lock"}:
return {"new_unique_id": f"{entity_entry.unique_id}_{MAIN}"}
if entity_entry.domain == "sensor":
delimiter = "." if " " not in entity_entry.unique_id else " "
if delimiter not in entity_entry.unique_id:
return None
device_id, attribute = entity_entry.unique_id.split(
delimiter, maxsplit=1
)
if (
capability := SENSOR_ATTRIBUTES_TO_CAPABILITIES.get(attribute)
) is None:
if attribute in {
"energy_meter",
"power_meter",
"deltaEnergy_meter",
"powerEnergy_meter",
"energySaved_meter",
}:
return {
"new_unique_id": f"{device_id}_{MAIN}_{Capability.POWER_CONSUMPTION_REPORT}_{Attribute.POWER_CONSUMPTION}_{attribute}",
}
if attribute in {
"X Coordinate",
"Y Coordinate",
"Z Coordinate",
}:
new_attribute = {
"X Coordinate": "x_coordinate",
"Y Coordinate": "y_coordinate",
"Z Coordinate": "z_coordinate",
}[attribute]
return {
"new_unique_id": f"{device_id}_{MAIN}_{Capability.THREE_AXIS}_{Attribute.THREE_AXIS}_{new_attribute}",
}
if attribute in {
Attribute.MACHINE_STATE,
Attribute.COMPLETION_TIME,
}:
capability = determine_machine_type(
hass, entry.entry_id, device_id
)
if capability is None:
return None
return {
"new_unique_id": f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}",
}
return None
return {
"new_unique_id": f"{device_id}_{MAIN}_{capability}_{attribute}_{attribute}",
}
if entity_entry.domain == "switch":
return {
"new_unique_id": f"{entity_entry.unique_id}_{MAIN}_{Capability.SWITCH}_{Attribute.SWITCH}_{Attribute.SWITCH}",
}
return None
await async_migrate_entries(hass, entry.entry_id, migrate_entities)
hass.config_entries.async_update_entry(
entry,
minor_version=2,
)
return True
def determine_machine_type(
hass: HomeAssistant,
entry_id: str,
device_id: str,
) -> Capability | None:
"""Determine the machine type for a device."""
entity_registry = er.async_get(hass)
entries = er.async_entries_for_config_entry(entity_registry, entry_id)
device_entries = [entry for entry in entries if device_id in entry.unique_id]
for entry in device_entries:
if Attribute.DISHWASHER_JOB_STATE in entry.unique_id:
return Capability.DISHWASHER_OPERATING_STATE
if Attribute.WASHER_JOB_STATE in entry.unique_id:
return Capability.WASHER_OPERATING_STATE
if Attribute.DRYER_JOB_STATE in entry.unique_id:
return Capability.DRYER_OPERATING_STATE
if Attribute.OVEN_JOB_STATE in entry.unique_id:
return Capability.OVEN_OPERATING_STATE
return None
def create_devices(
device_registry: dr.DeviceRegistry,
devices: dict[str, FullDevice],
entry: SmartThingsConfigEntry,
rooms: dict[str, str],
) -> None:
"""Create devices in the device registry."""
for device in sorted(
devices.values(), key=lambda d: d.device.parent_device_id or ""
):
kwargs: dict[str, Any] = {}
if device.device.hub is not None:
kwargs = {
ATTR_SW_VERSION: device.device.hub.firmware_version,
ATTR_MODEL: device.device.hub.hardware_type,
}
if device.device.hub.mac_address:
kwargs[ATTR_CONNECTIONS] = {
(dr.CONNECTION_NETWORK_MAC, device.device.hub.mac_address)
}
if device.device.parent_device_id and device.device.parent_device_id in devices:
kwargs[ATTR_VIA_DEVICE] = (DOMAIN, device.device.parent_device_id)
if (ocf := device.device.ocf) is not None:
kwargs.update(
{
ATTR_MANUFACTURER: ocf.manufacturer_name,
ATTR_MODEL: (
(ocf.model_number.split("|")[0]) if ocf.model_number else None
),
ATTR_HW_VERSION: ocf.hardware_version,
ATTR_SW_VERSION: ocf.firmware_version,
}
)
if (viper := device.device.viper) is not None:
kwargs.update(
{
ATTR_MANUFACTURER: viper.manufacturer_name,
ATTR_MODEL: viper.model_name,
ATTR_HW_VERSION: viper.hardware_version,
ATTR_SW_VERSION: viper.software_version,
}
)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, device.device.device_id)},
configuration_url="https://account.smartthings.com",
name=device.device.label,
suggested_area=(
rooms.get(device.device.room_id) if device.device.room_id else None
),
**kwargs,
)
KEEP_CAPABILITY_QUIRK: dict[
Capability | str, Callable[[dict[Attribute | str, Status]], bool]
] = {
Capability.DRYER_OPERATING_STATE: (
lambda status: status[Attribute.SUPPORTED_MACHINE_STATES].value is not None
),
Capability.WASHER_OPERATING_STATE: (
lambda status: status[Attribute.SUPPORTED_MACHINE_STATES].value is not None
),
Capability.DEMAND_RESPONSE_LOAD_CONTROL: lambda _: True,
}
def process_status(status: dict[str, ComponentStatus]) -> dict[str, ComponentStatus]:
"""Remove disabled capabilities from status."""
if (main_component := status.get(MAIN)) is None:
return status
if (
disabled_components_capability := main_component.get(
Capability.CUSTOM_DISABLED_COMPONENTS
)
) is not None:
disabled_components = cast(
list[str],
disabled_components_capability[Attribute.DISABLED_COMPONENTS].value,
)
if disabled_components is not None:
for component in disabled_components:
if component in status:
del status[component]
for component_status in status.values():
process_component_status(component_status)
return status
def process_component_status(status: ComponentStatus) -> None:
"""Remove disabled capabilities from component status."""
if (
disabled_capabilities_capability := status.get(
Capability.CUSTOM_DISABLED_CAPABILITIES
)
) is not None:
disabled_capabilities = cast(
list[Capability | str],
disabled_capabilities_capability[Attribute.DISABLED_CAPABILITIES].value,
)
if disabled_capabilities is not None:
for capability in disabled_capabilities:
if capability in status and (
capability not in KEEP_CAPABILITY_QUIRK
or not KEEP_CAPABILITY_QUIRK[capability](status[capability])
):
del status[capability]