Compare commits

..

7 Commits

Author SHA1 Message Date
farmio
c7ad427366 add dpt selector translations 2025-12-11 23:14:48 +01:00
farmio
1b7c9afb2c validate device_class, state_class and unit 2025-12-10 23:08:09 +01:00
farmio
6974a70607 sort in BE for test consistency 2025-12-10 21:11:57 +01:00
farmio
9334d4b108 unit_o_m, device_class translations 2025-12-10 21:02:24 +01:00
Matthias Alphart
57ae4c8656 Update strings.json
Co-authored-by: Norbert Rittel <norbert@rittel.de>
2025-12-10 15:55:38 +01:00
farmio
76b9a99bdd add tests 2025-12-10 11:19:09 +01:00
farmio
47e9c5785f Support KNX sensor entity configuration from UI 2025-12-09 22:57:20 +01:00
284 changed files with 6411 additions and 12350 deletions

View File

@@ -41,8 +41,8 @@ env:
UV_CACHE_VERSION: 1
MYPY_CACHE_VERSION: 1
HA_SHORT_VERSION: "2026.1"
DEFAULT_PYTHON: "3.13.11"
ALL_PYTHON_VERSIONS: "['3.13.11', '3.14.2']"
DEFAULT_PYTHON: "3.13.9"
ALL_PYTHON_VERSIONS: "['3.13.9', '3.14.0']"
# 10.3 is the oldest supported version
# - 10.3.32 is the version currently shipped with Synology (as of 17 Feb 2022)
# 10.6 is the current long-term-support
@@ -1188,7 +1188,7 @@ jobs:
pattern: coverage-*
- name: Upload coverage to Codecov
if: needs.info.outputs.test_full_suite == 'true'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1
with:
fail_ci_if_error: true
flags: full-suite
@@ -1313,7 +1313,7 @@ jobs:
pattern: coverage-*
- name: Upload coverage to Codecov
if: needs.info.outputs.test_full_suite == 'false'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -24,11 +24,11 @@ jobs:
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Initialize CodeQL
uses: github/codeql-action/init@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7
uses: github/codeql-action/init@fe4161a26a8629af62121b670040955b330f9af2 # v4.31.6
with:
languages: python
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7
uses: github/codeql-action/analyze@fe4161a26a8629af62121b670040955b330f9af2 # v4.31.6
with:
category: "/language:python"

6
CODEOWNERS generated
View File

@@ -73,8 +73,6 @@ build.json @home-assistant/supervisor
/tests/components/airobot/ @mettolen
/homeassistant/components/airos/ @CoMPaTech
/tests/components/airos/ @CoMPaTech
/homeassistant/components/airpatrol/ @antondalgren
/tests/components/airpatrol/ @antondalgren
/homeassistant/components/airq/ @Sibgatulin @dl2080
/tests/components/airq/ @Sibgatulin @dl2080
/homeassistant/components/airthings/ @danielhiversen @LaStrada
@@ -420,8 +418,6 @@ build.json @home-assistant/supervisor
/homeassistant/components/efergy/ @tkdrob
/tests/components/efergy/ @tkdrob
/homeassistant/components/egardia/ @jeroenterheerdt
/homeassistant/components/egauge/ @neggert
/tests/components/egauge/ @neggert
/homeassistant/components/eheimdigital/ @autinerd
/tests/components/eheimdigital/ @autinerd
/homeassistant/components/ekeybionyx/ @richardpolzer
@@ -464,7 +460,7 @@ build.json @home-assistant/supervisor
/tests/components/enigma2/ @autinerd
/homeassistant/components/enphase_envoy/ @bdraco @cgarwood @catsmanac
/tests/components/enphase_envoy/ @bdraco @cgarwood @catsmanac
/homeassistant/components/entur_public_transport/ @hfurubotten @SanderBlom
/homeassistant/components/entur_public_transport/ @hfurubotten
/homeassistant/components/environment_canada/ @gwww @michaeldavie
/tests/components/environment_canada/ @gwww @michaeldavie
/homeassistant/components/ephember/ @ttroy50 @roberty99

View File

@@ -1,38 +0,0 @@
"""Diagnostics support for Airobot."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from .coordinator import AirobotConfigEntry
TO_REDACT_CONFIG = [CONF_HOST, CONF_MAC, CONF_PASSWORD, CONF_USERNAME]
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: AirobotConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
coordinator = entry.runtime_data
# Build device capabilities info
device_capabilities = None
if coordinator.data:
device_capabilities = {
"has_floor_sensor": coordinator.data.status.has_floor_sensor,
"has_co2_sensor": coordinator.data.status.has_co2_sensor,
"hw_version": coordinator.data.status.hw_version,
"fw_version": coordinator.data.status.fw_version,
}
return {
"entry_data": async_redact_data(entry.data, TO_REDACT_CONFIG),
"device_capabilities": device_capabilities,
"status": asdict(coordinator.data.status) if coordinator.data else None,
"settings": asdict(coordinator.data.settings) if coordinator.data else None,
}

View File

@@ -39,7 +39,7 @@ rules:
# Gold
devices: done
diagnostics: done
diagnostics: todo
discovery-update-info: done
discovery: done
docs-data-update: done
@@ -55,7 +55,7 @@ rules:
entity-category: done
entity-device-class: done
entity-disabled-by-default: done
entity-translations: done
entity-translations: todo
exception-translations: done
icon-translations: todo
reconfiguration-flow: todo

View File

@@ -1,24 +0,0 @@
"""The AirPatrol integration."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from .const import PLATFORMS
from .coordinator import AirPatrolConfigEntry, AirPatrolDataUpdateCoordinator
async def async_setup_entry(hass: HomeAssistant, entry: AirPatrolConfigEntry) -> bool:
"""Set up AirPatrol from a config entry."""
coordinator = AirPatrolDataUpdateCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: AirPatrolConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,208 +0,0 @@
"""Climate platform for AirPatrol integration."""
from __future__ import annotations
from typing import Any
from homeassistant.components.climate import (
FAN_AUTO,
FAN_HIGH,
FAN_LOW,
SWING_OFF,
SWING_ON,
ClimateEntity,
ClimateEntityFeature,
HVACMode,
)
from homeassistant.const import ATTR_TEMPERATURE, UnitOfTemperature
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import AirPatrolConfigEntry
from .coordinator import AirPatrolDataUpdateCoordinator
from .entity import AirPatrolEntity
PARALLEL_UPDATES = 0
AP_TO_HA_HVAC_MODES = {
"heat": HVACMode.HEAT,
"cool": HVACMode.COOL,
"off": HVACMode.OFF,
}
HA_TO_AP_HVAC_MODES = {value: key for key, value in AP_TO_HA_HVAC_MODES.items()}
AP_TO_HA_FAN_MODES = {
"min": FAN_LOW,
"max": FAN_HIGH,
"auto": FAN_AUTO,
}
HA_TO_AP_FAN_MODES = {value: key for key, value in AP_TO_HA_FAN_MODES.items()}
AP_TO_HA_SWING_MODES = {
"on": SWING_ON,
"off": SWING_OFF,
}
HA_TO_AP_SWING_MODES = {value: key for key, value in AP_TO_HA_SWING_MODES.items()}
async def async_setup_entry(
hass: HomeAssistant,
config_entry: AirPatrolConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up AirPatrol climate entities."""
coordinator = config_entry.runtime_data
units = coordinator.data
async_add_entities(
AirPatrolClimate(coordinator, unit_id)
for unit_id, unit in units.items()
if "climate" in unit
)
class AirPatrolClimate(AirPatrolEntity, ClimateEntity):
"""AirPatrol climate entity."""
_attr_name = None
_attr_temperature_unit = UnitOfTemperature.CELSIUS
_attr_supported_features = (
ClimateEntityFeature.TARGET_TEMPERATURE
| ClimateEntityFeature.FAN_MODE
| ClimateEntityFeature.SWING_MODE
| ClimateEntityFeature.TURN_OFF
| ClimateEntityFeature.TURN_ON
)
_attr_hvac_modes = [HVACMode.HEAT, HVACMode.COOL, HVACMode.OFF]
_attr_fan_modes = [FAN_LOW, FAN_HIGH, FAN_AUTO]
_attr_swing_modes = [SWING_ON, SWING_OFF]
_attr_min_temp = 16.0
_attr_max_temp = 30.0
def __init__(
self,
coordinator: AirPatrolDataUpdateCoordinator,
unit_id: str,
) -> None:
"""Initialize the climate entity."""
super().__init__(coordinator, unit_id)
self._attr_unique_id = f"{coordinator.config_entry.unique_id}-{unit_id}"
@property
def climate_data(self) -> dict[str, Any]:
"""Return the climate data."""
return self.device_data.get("climate") or {}
@property
def params(self) -> dict[str, Any]:
"""Return the current parameters for the climate entity."""
return self.climate_data.get("ParametersData") or {}
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and bool(self.climate_data)
@property
def current_humidity(self) -> float | None:
"""Return the current humidity."""
if humidity := self.climate_data.get("RoomHumidity"):
return float(humidity)
return None
@property
def current_temperature(self) -> float | None:
"""Return the current temperature."""
if temp := self.climate_data.get("RoomTemp"):
return float(temp)
return None
@property
def target_temperature(self) -> float | None:
"""Return the target temperature."""
if temp := self.params.get("PumpTemp"):
return float(temp)
return None
@property
def hvac_mode(self) -> HVACMode | None:
"""Return the current HVAC mode."""
pump_power = self.params.get("PumpPower")
pump_mode = self.params.get("PumpMode")
if pump_power and pump_power == "on" and pump_mode:
return AP_TO_HA_HVAC_MODES.get(pump_mode)
return HVACMode.OFF
@property
def fan_mode(self) -> str | None:
"""Return the current fan mode."""
fan_speed = self.params.get("FanSpeed")
if fan_speed:
return AP_TO_HA_FAN_MODES.get(fan_speed)
return None
@property
def swing_mode(self) -> str | None:
"""Return the current swing mode."""
swing = self.params.get("Swing")
if swing:
return AP_TO_HA_SWING_MODES.get(swing)
return None
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperature."""
params = self.params.copy()
if ATTR_TEMPERATURE in kwargs:
temp = kwargs[ATTR_TEMPERATURE]
params["PumpTemp"] = f"{temp:.3f}"
await self._async_set_params(params)
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new target hvac mode."""
params = self.params.copy()
if hvac_mode == HVACMode.OFF:
params["PumpPower"] = "off"
else:
params["PumpPower"] = "on"
params["PumpMode"] = HA_TO_AP_HVAC_MODES.get(hvac_mode)
await self._async_set_params(params)
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set new target fan mode."""
params = self.params.copy()
params["FanSpeed"] = HA_TO_AP_FAN_MODES.get(fan_mode)
await self._async_set_params(params)
async def async_set_swing_mode(self, swing_mode: str) -> None:
"""Set new target swing mode."""
params = self.params.copy()
params["Swing"] = HA_TO_AP_SWING_MODES.get(swing_mode)
await self._async_set_params(params)
async def async_turn_on(self) -> None:
"""Turn the entity on."""
params = self.params.copy()
if mode := AP_TO_HA_HVAC_MODES.get(params["PumpMode"]):
await self.async_set_hvac_mode(mode)
async def async_turn_off(self) -> None:
"""Turn the entity off."""
await self.async_set_hvac_mode(HVACMode.OFF)
async def _async_set_params(self, params: dict[str, Any]) -> None:
"""Set the unit to dry mode."""
new_climate_data = self.climate_data.copy()
new_climate_data["ParametersData"] = params
await self.coordinator.api.set_unit_climate_data(
self._unit_id, new_climate_data
)
await self.coordinator.async_request_refresh()

View File

@@ -1,111 +0,0 @@
"""Config flow for the AirPatrol integration."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from airpatrol.api import AirPatrolAPI, AirPatrolAuthenticationError, AirPatrolError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_EMAIL, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import DOMAIN
DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_EMAIL): TextSelector(
TextSelectorConfig(
type=TextSelectorType.EMAIL,
autocomplete="email",
)
),
vol.Required(CONF_PASSWORD): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD,
autocomplete="current-password",
)
),
}
)
async def validate_api(
hass: HomeAssistant, user_input: dict[str, str]
) -> tuple[str | None, str | None, dict[str, str]]:
"""Validate the API connection."""
errors: dict[str, str] = {}
session = async_get_clientsession(hass)
access_token = None
unique_id = None
try:
api = await AirPatrolAPI.authenticate(
session, user_input[CONF_EMAIL], user_input[CONF_PASSWORD]
)
except AirPatrolAuthenticationError:
errors["base"] = "invalid_auth"
except AirPatrolError:
errors["base"] = "cannot_connect"
else:
access_token = api.get_access_token()
unique_id = api.get_unique_id()
return (access_token, unique_id, errors)
class AirPatrolConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for AirPatrol."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
access_token, unique_id, errors = await validate_api(self.hass, user_input)
if access_token and unique_id:
user_input[CONF_ACCESS_TOKEN] = access_token
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_input[CONF_EMAIL], data=user_input
)
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors
)
async def async_step_reauth(
self, user_input: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle reauthentication with new credentials."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reauthentication confirmation."""
errors: dict[str, str] = {}
if user_input:
access_token, unique_id, errors = await validate_api(self.hass, user_input)
if access_token and unique_id:
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_mismatch()
user_input[CONF_ACCESS_TOKEN] = access_token
return self.async_update_reload_and_abort(
self._get_reauth_entry(), data_updates=user_input
)
return self.async_show_form(
step_id="reauth_confirm", data_schema=DATA_SCHEMA, errors=errors
)

View File

@@ -1,16 +0,0 @@
"""Constants for the AirPatrol integration."""
from datetime import timedelta
import logging
from airpatrol.api import AirPatrolAuthenticationError, AirPatrolError
from homeassistant.const import Platform
DOMAIN = "airpatrol"
LOGGER = logging.getLogger(__package__)
PLATFORMS = [Platform.CLIMATE]
SCAN_INTERVAL = timedelta(minutes=1)
AIRPATROL_ERRORS = (AirPatrolAuthenticationError, AirPatrolError)

View File

@@ -1,100 +0,0 @@
"""Data update coordinator for AirPatrol."""
from __future__ import annotations
from typing import Any
from airpatrol.api import AirPatrolAPI, AirPatrolAuthenticationError, AirPatrolError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_EMAIL, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, LOGGER, SCAN_INTERVAL
type AirPatrolConfigEntry = ConfigEntry[AirPatrolDataUpdateCoordinator]
class AirPatrolDataUpdateCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
"""Class to manage fetching AirPatrol data."""
config_entry: AirPatrolConfigEntry
api: AirPatrolAPI
def __init__(self, hass: HomeAssistant, config_entry: AirPatrolConfigEntry) -> None:
"""Initialize."""
super().__init__(
hass,
LOGGER,
name=f"{DOMAIN.capitalize()} {config_entry.title}",
update_interval=SCAN_INTERVAL,
config_entry=config_entry,
)
async def _async_setup(self) -> None:
try:
await self._setup_client()
except AirPatrolError as api_err:
raise UpdateFailed(
f"Error communicating with AirPatrol API: {api_err}"
) from api_err
async def _async_update_data(self) -> dict[str, dict[str, Any]]:
"""Update unit data from AirPatrol API."""
return {unit_data["unit_id"]: unit_data for unit_data in await self._get_data()}
async def _get_data(self, retry: bool = False) -> list[dict[str, Any]]:
"""Fetch data from API."""
try:
return await self.api.get_data()
except AirPatrolAuthenticationError as auth_err:
if retry:
raise ConfigEntryAuthFailed(
"Authentication with AirPatrol failed"
) from auth_err
await self._update_token()
return await self._get_data(retry=True)
except AirPatrolError as err:
raise UpdateFailed(
f"Error communicating with AirPatrol API: {err}"
) from err
async def _update_token(self) -> None:
"""Refresh the AirPatrol API client and update the access token."""
session = async_get_clientsession(self.hass)
try:
self.api = await AirPatrolAPI.authenticate(
session,
self.config_entry.data[CONF_EMAIL],
self.config_entry.data[CONF_PASSWORD],
)
except AirPatrolAuthenticationError as auth_err:
raise ConfigEntryAuthFailed(
"Authentication with AirPatrol failed"
) from auth_err
self.hass.config_entries.async_update_entry(
self.config_entry,
data={
**self.config_entry.data,
CONF_ACCESS_TOKEN: self.api.get_access_token(),
},
)
async def _setup_client(self) -> None:
"""Set up the AirPatrol API client from stored access_token."""
session = async_get_clientsession(self.hass)
api = AirPatrolAPI(
session,
self.config_entry.data[CONF_ACCESS_TOKEN],
self.config_entry.unique_id,
)
try:
await api.get_data()
except AirPatrolAuthenticationError:
await self._update_token()
self.api = api

View File

@@ -1,44 +0,0 @@
"""Base entity for AirPatrol integration."""
from __future__ import annotations
from typing import Any
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import AirPatrolDataUpdateCoordinator
class AirPatrolEntity(CoordinatorEntity[AirPatrolDataUpdateCoordinator]):
"""Base entity for AirPatrol devices."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: AirPatrolDataUpdateCoordinator,
unit_id: str,
) -> None:
"""Initialize the AirPatrol entity."""
super().__init__(coordinator)
self._unit_id = unit_id
device = coordinator.data[unit_id]
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, unit_id)},
name=device["name"],
manufacturer=device["manufacturer"],
model=device["model"],
serial_number=device["hwid"],
)
@property
def device_data(self) -> dict[str, Any]:
"""Return the device data."""
return self.coordinator.data[self._unit_id]
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and self._unit_id in self.coordinator.data

View File

@@ -1,11 +0,0 @@
{
"domain": "airpatrol",
"name": "AirPatrol",
"codeowners": ["@antondalgren"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/airpatrol",
"integration_type": "device",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["airpatrol==0.1.0"]
}

View File

@@ -1,65 +0,0 @@
rules:
# Bronze
action-setup: done
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: Integration does not provide custom actions
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: |
Entities doesn't subscribe to events.
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions: done
config-entry-unloading: done
docs-configuration-parameters: done
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: todo
parallel-updates: done
reauthentication-flow: done
test-coverage: done
# Gold
devices: done
diagnostics: todo
discovery-update-info: todo
discovery: todo
docs-data-update: todo
docs-examples: todo
docs-known-limitations: todo
docs-supported-devices: todo
docs-supported-functions: todo
docs-troubleshooting: todo
docs-use-cases: todo
dynamic-devices: todo
entity-category: done
entity-device-class: done
entity-disabled-by-default: todo
entity-translations: done
exception-translations: todo
icon-translations: todo
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: todo
inject-websession: todo
strict-typing: todo

View File

@@ -1,38 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"unique_id_mismatch": "Login credentials do not match the configured account"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"reauth_confirm": {
"data": {
"email": "[%key:common::config_flow::data::email%]",
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"email": "[%key:component::airpatrol::config::step::user::data_description::email%]",
"password": "[%key:component::airpatrol::config::step::user::data_description::password%]"
},
"description": "Reauthenticate with AirPatrol"
},
"user": {
"data": {
"email": "[%key:common::config_flow::data::email%]",
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"email": "Your AirPatrol email address",
"password": "Your AirPatrol password"
},
"description": "Connect to AirPatrol"
}
}
}
}

View File

@@ -11,5 +11,5 @@
"documentation": "https://www.home-assistant.io/integrations/airzone",
"iot_class": "local_polling",
"loggers": ["aioairzone"],
"requirements": ["aioairzone==1.0.4"]
"requirements": ["aioairzone==1.0.2"]
}

View File

@@ -39,6 +39,7 @@ async def async_setup_entry(
cookie_jar=CookieJar(quote_cookie=False),
),
refresh_token=entry.data[CONF_ACCESS_TOKEN],
account_number=entry.data[CONF_ACCOUNT_NUMBER],
)
try:
await auth.send_refresh_request()
@@ -48,7 +49,7 @@ async def async_setup_entry(
_aw = AnglianWater(authenticator=auth)
try:
await _aw.validate_smart_meter(entry.data[CONF_ACCOUNT_NUMBER])
await _aw.validate_smart_meter()
except SmartMeterUnavailableError as err:
raise ConfigEntryError(
translation_domain=DOMAIN, translation_key="smart_meter_unavailable"

View File

@@ -7,7 +7,7 @@ from typing import Any
from aiohttp import CookieJar
from pyanglianwater import AnglianWater
from pyanglianwater.auth import MSOB2CAuth
from pyanglianwater.auth import BaseAuth, MSOB2CAuth
from pyanglianwater.exceptions import (
InvalidAccountIdError,
SelfAssertedError,
@@ -35,9 +35,7 @@ STEP_USER_DATA_SCHEMA = vol.Schema(
)
async def validate_credentials(
auth: MSOB2CAuth, account_number: str
) -> str | MSOB2CAuth:
async def validate_credentials(auth: MSOB2CAuth) -> str | MSOB2CAuth:
"""Validate the provided credentials."""
try:
await auth.send_login_request()
@@ -48,7 +46,7 @@ async def validate_credentials(
return "unknown"
_aw = AnglianWater(authenticator=auth)
try:
await _aw.validate_smart_meter(account_number)
await _aw.validate_smart_meter()
except (InvalidAccountIdError, SmartMeterUnavailableError):
return "smart_meter_unavailable"
return auth
@@ -71,12 +69,10 @@ class AnglianWaterConfigFlow(ConfigFlow, domain=DOMAIN):
self.hass,
cookie_jar=CookieJar(quote_cookie=False),
),
),
user_input[CONF_ACCOUNT_NUMBER],
account_number=user_input[CONF_ACCOUNT_NUMBER],
)
)
if isinstance(validation_response, str):
errors["base"] = validation_response
else:
if isinstance(validation_response, BaseAuth):
await self.async_set_unique_id(user_input[CONF_ACCOUNT_NUMBER])
self._abort_if_unique_id_configured()
return self.async_create_entry(
@@ -86,6 +82,7 @@ class AnglianWaterConfigFlow(ConfigFlow, domain=DOMAIN):
CONF_ACCESS_TOKEN: validation_response.refresh_token,
},
)
errors["base"] = validation_response
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors

View File

@@ -12,7 +12,7 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_ACCOUNT_NUMBER, DOMAIN
from .const import DOMAIN
type AnglianWaterConfigEntry = ConfigEntry[AnglianWaterUpdateCoordinator]
@@ -44,6 +44,6 @@ class AnglianWaterUpdateCoordinator(DataUpdateCoordinator[None]):
async def _async_update_data(self) -> None:
"""Update data from Anglian Water's API."""
try:
return await self.api.update(self.config_entry.data[CONF_ACCOUNT_NUMBER])
return await self.api.update()
except (ExpiredAccessTokenError, UnknownEndpointError) as err:
raise UpdateFailed from err

View File

@@ -7,5 +7,5 @@
"iot_class": "cloud_polling",
"loggers": ["pyanglianwater"],
"quality_scale": "bronze",
"requirements": ["pyanglianwater==3.0.0"]
"requirements": ["pyanglianwater==2.1.0"]
}

View File

@@ -3,9 +3,8 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
import math
from pysilero_vad import SileroVoiceActivityDetector
from pymicro_vad import MicroVad
from pyspeex_noise import AudioProcessor
from .const import BYTES_PER_CHUNK
@@ -43,8 +42,8 @@ class AudioEnhancer(ABC):
"""Enhance chunk of PCM audio @ 16Khz with 16-bit mono samples."""
class SileroVadSpeexEnhancer(AudioEnhancer):
"""Audio enhancer that runs Silero VAD and speex."""
class MicroVadSpeexEnhancer(AudioEnhancer):
"""Audio enhancer that runs microVAD and speex."""
def __init__(
self, auto_gain: int, noise_suppression: int, is_vad_enabled: bool
@@ -70,49 +69,21 @@ class SileroVadSpeexEnhancer(AudioEnhancer):
self.noise_suppression,
)
self.vad: SileroVoiceActivityDetector | None = None
# We get 10ms chunks but Silero works on 32ms chunks, so we have to
# buffer audio. The previous speech probability is used until enough
# audio has been buffered.
self._vad_buffer: bytearray | None = None
self._vad_buffer_chunks = 0
self._vad_buffer_chunk_idx = 0
self._last_speech_probability: float | None = None
self.vad: MicroVad | None = None
if self.is_vad_enabled:
self.vad = SileroVoiceActivityDetector()
# VAD buffer is a multiple of 10ms, but Silero VAD needs 32ms.
self._vad_buffer_chunks = int(
math.ceil(self.vad.chunk_bytes() / BYTES_PER_CHUNK)
)
self._vad_leftover_bytes = self.vad.chunk_bytes() - BYTES_PER_CHUNK
self._vad_buffer = bytearray(self.vad.chunk_bytes())
_LOGGER.debug("Initialized Silero VAD")
self.vad = MicroVad()
_LOGGER.debug("Initialized microVAD")
def enhance_chunk(self, audio: bytes, timestamp_ms: int) -> EnhancedAudioChunk:
"""Enhance 10ms chunk of PCM audio @ 16Khz with 16-bit mono samples."""
speech_probability: float | None = None
assert len(audio) == BYTES_PER_CHUNK
if self.vad is not None:
# Run VAD
assert self._vad_buffer is not None
start_idx = self._vad_buffer_chunk_idx * BYTES_PER_CHUNK
self._vad_buffer[start_idx : start_idx + BYTES_PER_CHUNK] = audio
self._vad_buffer_chunk_idx += 1
if self._vad_buffer_chunk_idx >= self._vad_buffer_chunks:
# We have enough data to run Silero VAD (32 ms)
self._last_speech_probability = self.vad.process_chunk(
self._vad_buffer[: self.vad.chunk_bytes()]
)
# Copy leftover audio that wasn't processed to start
self._vad_buffer[: self._vad_leftover_bytes] = self._vad_buffer[
-self._vad_leftover_bytes :
]
self._vad_buffer_chunk_idx = 0
speech_probability = self.vad.Process10ms(audio)
if self.audio_processor is not None:
# Run noise suppression and auto gain
@@ -121,5 +92,5 @@ class SileroVadSpeexEnhancer(AudioEnhancer):
return EnhancedAudioChunk(
audio=audio,
timestamp_ms=timestamp_ms,
speech_probability=self._last_speech_probability,
speech_probability=speech_probability,
)

View File

@@ -8,5 +8,5 @@
"integration_type": "system",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["pysilero-vad==3.0.1", "pyspeex-noise==1.0.2"]
"requirements": ["pymicro-vad==1.0.1", "pyspeex-noise==1.0.2"]
}

View File

@@ -55,7 +55,7 @@ from homeassistant.util import (
from homeassistant.util.hass_dict import HassKey
from homeassistant.util.limited_size_dict import LimitedSizeDict
from .audio_enhancer import AudioEnhancer, EnhancedAudioChunk, SileroVadSpeexEnhancer
from .audio_enhancer import AudioEnhancer, EnhancedAudioChunk, MicroVadSpeexEnhancer
from .const import (
ACKNOWLEDGE_PATH,
BYTES_PER_CHUNK,
@@ -633,7 +633,7 @@ class PipelineRun:
# Initialize with audio settings
if self.audio_settings.needs_processor and (self.audio_enhancer is None):
# Default audio enhancer
self.audio_enhancer = SileroVadSpeexEnhancer(
self.audio_enhancer = MicroVadSpeexEnhancer(
self.audio_settings.auto_gain_dbfs,
self.audio_settings.noise_suppression_level,
self.audio_settings.is_vad_enabled,

View File

@@ -7,5 +7,5 @@
"integration_type": "hub",
"iot_class": "local_polling",
"loggers": ["aioasuswrt", "asusrouter", "asyncssh"],
"requirements": ["aioasuswrt==1.5.2", "asusrouter==1.21.3"]
"requirements": ["aioasuswrt==1.5.1", "asusrouter==1.21.0"]
}

View File

@@ -29,5 +29,5 @@
"documentation": "https://www.home-assistant.io/integrations/august",
"iot_class": "cloud_push",
"loggers": ["pubnub", "yalexs"],
"requirements": ["yalexs==9.2.0", "yalexs-ble==3.2.2"]
"requirements": ["yalexs==9.2.0", "yalexs-ble==3.2.1"]
}

View File

@@ -125,7 +125,6 @@ _EXPERIMENTAL_TRIGGER_PLATFORMS = {
"alarm_control_panel",
"assist_satellite",
"binary_sensor",
"button",
"climate",
"cover",
"fan",

View File

@@ -32,7 +32,7 @@ BEO_STATES: dict[str, MediaPlayerState] = {
"buffering": MediaPlayerState.PLAYING,
"idle": MediaPlayerState.IDLE,
"paused": MediaPlayerState.PAUSED,
"stopped": MediaPlayerState.IDLE,
"stopped": MediaPlayerState.PAUSED,
"ended": MediaPlayerState.PAUSED,
"error": MediaPlayerState.IDLE,
# A device's initial state is "unknown" and should be treated as "idle"

View File

@@ -20,5 +20,5 @@
"documentation": "https://www.home-assistant.io/integrations/blink",
"iot_class": "cloud_polling",
"loggers": ["blinkpy"],
"requirements": ["blinkpy==0.25.1"]
"requirements": ["blinkpy==0.24.1"]
}

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Any, Final
from typing import Any
from bsblan import BSBLANError
@@ -17,7 +17,7 @@ from homeassistant.components.climate import (
)
from homeassistant.const import ATTR_TEMPERATURE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util.enum import try_parse_enum
@@ -39,22 +39,6 @@ PRESET_MODES = [
PRESET_NONE,
]
# Mapping from Home Assistant HVACMode to BSB-Lan integer values
# BSB-Lan uses: 0=off, 1=auto, 2=eco/reduced, 3=heat/comfort
HA_TO_BSBLAN_HVAC_MODE: Final[dict[HVACMode, int]] = {
HVACMode.OFF: 0,
HVACMode.AUTO: 1,
HVACMode.HEAT: 3,
}
# Mapping from BSB-Lan integer values to Home Assistant HVACMode
BSBLAN_TO_HA_HVAC_MODE: Final[dict[int, HVACMode]] = {
0: HVACMode.OFF,
1: HVACMode.AUTO,
2: HVACMode.AUTO, # eco/reduced maps to AUTO with preset
3: HVACMode.HEAT,
}
async def async_setup_entry(
hass: HomeAssistant,
@@ -114,20 +98,17 @@ class BSBLANClimate(BSBLanEntity, ClimateEntity):
@property
def hvac_mode(self) -> HVACMode | None:
"""Return hvac operation ie. heat, cool mode."""
hvac_mode_value = self.coordinator.data.state.hvac_mode.value
if hvac_mode_value is None:
return None
# BSB-Lan returns integer values: 0=off, 1=auto, 2=eco, 3=heat
if isinstance(hvac_mode_value, int):
return BSBLAN_TO_HA_HVAC_MODE.get(hvac_mode_value)
return try_parse_enum(HVACMode, hvac_mode_value)
if self.coordinator.data.state.hvac_mode.value == PRESET_ECO:
return HVACMode.AUTO
return try_parse_enum(HVACMode, self.coordinator.data.state.hvac_mode.value)
@property
def preset_mode(self) -> str | None:
"""Return the current preset mode."""
hvac_mode_value = self.coordinator.data.state.hvac_mode.value
# BSB-Lan mode 2 is eco/reduced mode
if hvac_mode_value == 2:
if (
self.hvac_mode == HVACMode.AUTO
and self.coordinator.data.state.hvac_mode.value == PRESET_ECO
):
return PRESET_ECO
return PRESET_NONE
@@ -137,6 +118,13 @@ class BSBLANClimate(BSBLanEntity, ClimateEntity):
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set preset mode."""
if self.hvac_mode != HVACMode.AUTO and preset_mode != PRESET_NONE:
raise ServiceValidationError(
"Preset mode can only be set when HVAC mode is set to 'auto'",
translation_domain=DOMAIN,
translation_key="set_preset_mode_error",
translation_placeholders={"preset_mode": preset_mode},
)
await self.async_set_data(preset_mode=preset_mode)
async def async_set_temperature(self, **kwargs: Any) -> None:
@@ -145,17 +133,16 @@ class BSBLANClimate(BSBLanEntity, ClimateEntity):
async def async_set_data(self, **kwargs: Any) -> None:
"""Set device settings using BSBLAN."""
data: dict[str, Any] = {}
data = {}
if ATTR_TEMPERATURE in kwargs:
data[ATTR_TARGET_TEMPERATURE] = kwargs[ATTR_TEMPERATURE]
if ATTR_HVAC_MODE in kwargs:
data[ATTR_HVAC_MODE] = HA_TO_BSBLAN_HVAC_MODE[kwargs[ATTR_HVAC_MODE]]
data[ATTR_HVAC_MODE] = kwargs[ATTR_HVAC_MODE]
if ATTR_PRESET_MODE in kwargs:
# eco preset uses BSB-Lan mode 2, none preset uses mode 1 (auto)
if kwargs[ATTR_PRESET_MODE] == PRESET_ECO:
data[ATTR_HVAC_MODE] = 2
data[ATTR_HVAC_MODE] = PRESET_ECO
elif kwargs[ATTR_PRESET_MODE] == PRESET_NONE:
data[ATTR_HVAC_MODE] = 1
data[ATTR_HVAC_MODE] = PRESET_NONE
try:
await self.coordinator.client.thermostat(**data)

View File

@@ -7,7 +7,7 @@
"integration_type": "device",
"iot_class": "local_polling",
"loggers": ["bsblan"],
"requirements": ["python-bsblan==3.1.4"],
"requirements": ["python-bsblan==3.1.1"],
"zeroconf": [
{
"name": "bsb-lan*",

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
from typing import Any
from bsblan import BSBLANError, SetHotWaterParam
from bsblan import BSBLANError
from homeassistant.components.water_heater import (
STATE_ECO,
@@ -131,9 +131,7 @@ class BSBLANWaterHeater(BSBLanDualCoordinatorEntity, WaterHeaterEntity):
"""Set new target temperature."""
temperature = kwargs.get(ATTR_TEMPERATURE)
try:
await self.coordinator.client.set_hot_water(
SetHotWaterParam(nominal_setpoint=temperature)
)
await self.coordinator.client.set_hot_water(nominal_setpoint=temperature)
except BSBLANError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
@@ -146,9 +144,7 @@ class BSBLANWaterHeater(BSBLanDualCoordinatorEntity, WaterHeaterEntity):
"""Set new operation mode."""
bsblan_mode = OPERATION_MODES_REVERSE.get(operation_mode)
try:
await self.coordinator.client.set_hot_water(
SetHotWaterParam(operating_mode=bsblan_mode)
)
await self.coordinator.client.set_hot_water(operating_mode=bsblan_mode)
except BSBLANError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,

View File

@@ -17,10 +17,5 @@
"press": {
"service": "mdi:gesture-tap-button"
}
},
"triggers": {
"pressed": {
"trigger": "mdi:gesture-tap-button"
}
}
}

View File

@@ -27,11 +27,5 @@
"name": "Press"
}
},
"title": "Button",
"triggers": {
"pressed": {
"description": "Triggers after one or several buttons were pressed.",
"name": "Button pressed"
}
}
"title": "Button"
}

View File

@@ -1,42 +0,0 @@
"""Provides triggers for buttons."""
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA,
EntityTriggerBase,
Trigger,
)
from . import DOMAIN
class ButtonPressedTrigger(EntityTriggerBase):
"""Trigger for button entity presses."""
_domain = DOMAIN
_schema = ENTITY_STATE_TRIGGER_SCHEMA
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check if the origin state is not an expected target states."""
# UNKNOWN is a valid from_state, otherwise the first time the button is pressed
# would not trigger
if from_state.state == STATE_UNAVAILABLE:
return False
return from_state.state != to_state.state
def is_valid_state(self, state: State) -> bool:
"""Check if the new state is not invalid."""
return state.state not in (STATE_UNAVAILABLE, STATE_UNKNOWN)
TRIGGERS: dict[str, type[Trigger]] = {
"pressed": ButtonPressedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for buttons."""
return TRIGGERS

View File

@@ -1,4 +0,0 @@
pressed:
target:
entity:
domain: button

View File

@@ -1,19 +1,157 @@
"""Module for color_extractor (RGB extraction from images) component."""
import asyncio
import io
import logging
import aiohttp
from colorthief import ColorThief
from PIL import UnidentifiedImageError
import voluptuous as vol
from homeassistant.components.light import (
ATTR_RGB_COLOR,
DOMAIN as LIGHT_DOMAIN,
LIGHT_TURN_ON_SCHEMA,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.const import SERVICE_TURN_ON as LIGHT_SERVICE_TURN_ON
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import aiohttp_client, config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
from .services import async_setup_services
from .const import ATTR_PATH, ATTR_URL, DOMAIN, SERVICE_TURN_ON
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
# Extend the existing light.turn_on service schema
SERVICE_SCHEMA = vol.All(
cv.has_at_least_one_key(ATTR_URL, ATTR_PATH),
cv.make_entity_service_schema(
{
**LIGHT_TURN_ON_SCHEMA,
vol.Exclusive(ATTR_PATH, "color_extractor"): cv.isfile,
vol.Exclusive(ATTR_URL, "color_extractor"): cv.url,
}
),
)
def _get_file(file_path):
"""Get a PIL acceptable input file reference.
Allows us to mock patch during testing to make BytesIO stream.
"""
return file_path
def _get_color(file_handler) -> tuple:
"""Given an image file, extract the predominant color from it."""
color_thief = ColorThief(file_handler)
# get_color returns a SINGLE RGB value for the given image
color = color_thief.get_color(quality=1)
_LOGGER.debug("Extracted RGB color %s from image", color)
return color
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Color extractor component."""
async_setup_services(hass)
async def async_handle_service(service_call: ServiceCall) -> None:
"""Decide which color_extractor method to call based on service."""
service_data = dict(service_call.data)
try:
if ATTR_URL in service_data:
image_type = "URL"
image_reference = service_data.pop(ATTR_URL)
color = await async_extract_color_from_url(image_reference)
elif ATTR_PATH in service_data:
image_type = "file path"
image_reference = service_data.pop(ATTR_PATH)
color = await hass.async_add_executor_job(
extract_color_from_path, image_reference
)
except UnidentifiedImageError as ex:
_LOGGER.error(
"Bad image from %s '%s' provided, are you sure it's an image? %s",
image_type,
image_reference,
ex,
)
return
if color:
service_data[ATTR_RGB_COLOR] = color
await hass.services.async_call(
LIGHT_DOMAIN, LIGHT_SERVICE_TURN_ON, service_data, blocking=True
)
hass.services.async_register(
DOMAIN,
SERVICE_TURN_ON,
async_handle_service,
schema=SERVICE_SCHEMA,
)
async def async_extract_color_from_url(url):
"""Handle call for URL based image."""
if not hass.config.is_allowed_external_url(url):
_LOGGER.error(
(
"External URL '%s' is not allowed, please add to"
" 'allowlist_external_urls'"
),
url,
)
return None
_LOGGER.debug("Getting predominant RGB from image URL '%s'", url)
# Download the image into a buffer for ColorThief to check against
try:
session = aiohttp_client.async_get_clientsession(hass)
async with asyncio.timeout(10):
response = await session.get(url)
except (TimeoutError, aiohttp.ClientError) as err:
_LOGGER.error("Failed to get ColorThief image due to HTTPError: %s", err)
return None
content = await response.content.read()
with io.BytesIO(content) as _file:
_file.name = "color_extractor.jpg"
_file.seek(0)
return _get_color(_file)
def extract_color_from_path(file_path):
"""Handle call for local file based image."""
if not hass.config.is_allowed_path(file_path):
_LOGGER.error(
(
"File path '%s' is not allowed, please add to"
" 'allowlist_external_dirs'"
),
file_path,
)
return None
_LOGGER.debug("Getting predominant RGB from file path '%s'", file_path)
_file = _get_file(file_path)
return _get_color(_file)
return True

View File

@@ -1,156 +0,0 @@
"""Module for color_extractor (RGB extraction from images) component."""
import asyncio
import io
import logging
import aiohttp
from colorthief import ColorThief
from PIL import UnidentifiedImageError
import voluptuous as vol
from homeassistant.components.light import (
ATTR_RGB_COLOR,
DOMAIN as LIGHT_DOMAIN,
LIGHT_TURN_ON_SCHEMA,
)
from homeassistant.const import SERVICE_TURN_ON as LIGHT_SERVICE_TURN_ON
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers import aiohttp_client, config_validation as cv
from .const import ATTR_PATH, ATTR_URL, DOMAIN, SERVICE_TURN_ON
_LOGGER = logging.getLogger(__name__)
# Extend the existing light.turn_on service schema
SERVICE_SCHEMA = vol.All(
cv.has_at_least_one_key(ATTR_URL, ATTR_PATH),
cv.make_entity_service_schema(
{
**LIGHT_TURN_ON_SCHEMA,
vol.Exclusive(ATTR_PATH, "color_extractor"): cv.isfile,
vol.Exclusive(ATTR_URL, "color_extractor"): cv.url,
}
),
)
def _get_file(file_path: str) -> str:
"""Get a PIL acceptable input file reference.
Allows us to mock patch during testing to make BytesIO stream.
"""
return file_path
def _get_color(file_handler: io.BytesIO | str) -> tuple[int, int, int]:
"""Given an image file, extract the predominant color from it."""
color_thief = ColorThief(file_handler)
# get_color returns a SINGLE RGB value for the given image
color = color_thief.get_color(quality=1)
_LOGGER.debug("Extracted RGB color %s from image", color)
return color
async def _async_extract_color_from_url(
hass: HomeAssistant, url: str
) -> tuple[int, int, int] | None:
"""Handle call for URL based image."""
if not hass.config.is_allowed_external_url(url):
_LOGGER.error(
(
"External URL '%s' is not allowed, please add to"
" 'allowlist_external_urls'"
),
url,
)
return None
_LOGGER.debug("Getting predominant RGB from image URL '%s'", url)
# Download the image into a buffer for ColorThief to check against
try:
session = aiohttp_client.async_get_clientsession(hass)
async with asyncio.timeout(10):
response = await session.get(url)
except (TimeoutError, aiohttp.ClientError) as err:
_LOGGER.error("Failed to get ColorThief image due to HTTPError: %s", err)
return None
content = await response.content.read()
with io.BytesIO(content) as _file:
_file.name = "color_extractor.jpg"
_file.seek(0)
return _get_color(_file)
def _extract_color_from_path(
hass: HomeAssistant, file_path: str
) -> tuple[int, int, int] | None:
"""Handle call for local file based image."""
if not hass.config.is_allowed_path(file_path):
_LOGGER.error(
"File path '%s' is not allowed, please add to 'allowlist_external_dirs'",
file_path,
)
return None
_LOGGER.debug("Getting predominant RGB from file path '%s'", file_path)
_file = _get_file(file_path)
return _get_color(_file)
async def async_handle_service(service_call: ServiceCall) -> None:
"""Decide which color_extractor method to call based on service."""
service_data = dict(service_call.data)
try:
if ATTR_URL in service_data:
image_type = "URL"
image_reference = service_data.pop(ATTR_URL)
color = await _async_extract_color_from_url(
service_call.hass, image_reference
)
elif ATTR_PATH in service_data:
image_type = "file path"
image_reference = service_data.pop(ATTR_PATH)
color = await service_call.hass.async_add_executor_job(
_extract_color_from_path, service_call.hass, image_reference
)
except UnidentifiedImageError as ex:
_LOGGER.error(
"Bad image from %s '%s' provided, are you sure it's an image? %s",
image_type,
image_reference,
ex,
)
return
if color:
service_data[ATTR_RGB_COLOR] = color
await service_call.hass.services.async_call(
LIGHT_DOMAIN, LIGHT_SERVICE_TURN_ON, service_data, blocking=True
)
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Register the services."""
hass.services.async_register(
DOMAIN,
SERVICE_TURN_ON,
async_handle_service,
schema=SERVICE_SCHEMA,
)

View File

@@ -7,5 +7,5 @@
"integration_type": "hub",
"iot_class": "cloud_push",
"quality_scale": "bronze",
"requirements": ["pycync==0.5.0"]
"requirements": ["pycync==0.4.3"]
}

View File

@@ -7,7 +7,7 @@
"documentation": "https://www.home-assistant.io/integrations/doorbird",
"iot_class": "local_push",
"loggers": ["doorbirdpy"],
"requirements": ["DoorBirdPy==3.0.11"],
"requirements": ["DoorBirdPy==3.0.8"],
"zeroconf": [
{
"properties": {

View File

@@ -1,42 +0,0 @@
"""Integration for eGauge energy monitors."""
from __future__ import annotations
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from .const import DOMAIN, MANUFACTURER, MODEL
from .coordinator import EgaugeConfigEntry, EgaugeDataCoordinator
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: EgaugeConfigEntry) -> bool:
"""Set up eGauge from a config entry."""
coordinator = EgaugeDataCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
# Store coordinator in runtime_data
entry.runtime_data = coordinator
# Set up main device
device_registry = dr.async_get(hass)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, coordinator.serial_number)},
name=coordinator.hostname,
manufacturer=MANUFACTURER,
model=MODEL,
serial_number=coordinator.serial_number,
)
# Setup sensor platform
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: EgaugeConfigEntry) -> bool:
"""Unload eGauge config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,77 +0,0 @@
"""Config flow to configure the eGauge integration."""
from __future__ import annotations
from typing import Any
from egauge_async.exceptions import EgaugeAuthenticationError, EgaugePermissionError
from egauge_async.json.client import EgaugeJsonClient
from httpx import ConnectError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.helpers.httpx_client import get_async_client
from .const import DOMAIN, LOGGER
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_SSL, default=True): bool,
vol.Required(CONF_VERIFY_SSL, default=False): bool,
}
)
class EgaugeFlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle an eGauge config flow."""
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, str] = {}
if user_input is not None:
client = EgaugeJsonClient(
host=user_input[CONF_HOST],
username=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD],
client=get_async_client(
self.hass, verify_ssl=user_input[CONF_VERIFY_SSL]
),
use_ssl=user_input[CONF_SSL],
)
try:
serial_number = await client.get_device_serial_number()
hostname = await client.get_hostname()
except EgaugeAuthenticationError:
errors["base"] = "invalid_auth"
except EgaugePermissionError:
errors["base"] = "missing_permission"
except ConnectError:
errors["base"] = "cannot_connect"
except Exception: # noqa: BLE001
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(serial_number)
self._abort_if_unique_id_configured()
return self.async_create_entry(title=hostname, data=user_input)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
)

View File

@@ -1,10 +0,0 @@
"""Constants for the eGauge integration."""
import logging
DOMAIN = "egauge"
LOGGER = logging.getLogger(__package__)
MANUFACTURER = "eGauge Systems"
MODEL = "eGauge Energy Monitor"
COORDINATOR_UPDATE_INTERVAL_SECONDS = 30

View File

@@ -1,105 +0,0 @@
"""Data update coordinator for eGauge energy monitors."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from egauge_async.exceptions import (
EgaugeAuthenticationError,
EgaugeException,
EgaugePermissionError,
)
from egauge_async.json.client import EgaugeJsonClient
from egauge_async.json.models import RegisterInfo
from httpx import ConnectError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import COORDINATOR_UPDATE_INTERVAL_SECONDS, DOMAIN, LOGGER
type EgaugeConfigEntry = ConfigEntry[EgaugeDataCoordinator]
@dataclass
class EgaugeData:
"""Data from eGauge device."""
measurements: dict[str, float] # Instantaneous values (W, V, A, etc.)
counters: dict[str, float] # Cumulative values (Ws)
register_info: dict[str, RegisterInfo] # Metadata for all registers
class EgaugeDataCoordinator(DataUpdateCoordinator[EgaugeData]):
"""Class to manage fetching eGauge data."""
serial_number: str
hostname: str
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
logger=LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=COORDINATOR_UPDATE_INTERVAL_SECONDS),
config_entry=config_entry,
)
self.client = EgaugeJsonClient(
host=config_entry.data[CONF_HOST],
username=config_entry.data[CONF_USERNAME],
password=config_entry.data[CONF_PASSWORD],
client=get_async_client(
hass, verify_ssl=config_entry.data[CONF_VERIFY_SSL]
),
use_ssl=config_entry.data[CONF_SSL],
)
# Populated in _async_setup
self._register_info: dict[str, RegisterInfo] = {}
async def _async_setup(self) -> None:
try:
self.serial_number = await self.client.get_device_serial_number()
self.hostname = await self.client.get_hostname()
self._register_info = await self.client.get_register_info()
except (
EgaugeAuthenticationError,
EgaugePermissionError,
EgaugeException,
) as err:
# EgaugeAuthenticationError and EgaugePermissionError will raise ConfigEntryAuthFailed once reauth is implemented
raise ConfigEntryError from err
except ConnectError as err:
raise UpdateFailed(f"Error fetching device info: {err}") from err
async def _async_update_data(self) -> EgaugeData:
"""Fetch data from eGauge device."""
try:
measurements = await self.client.get_current_measurements()
counters = await self.client.get_current_counters()
except (
EgaugeAuthenticationError,
EgaugePermissionError,
EgaugeException,
) as err:
# will raise ConfigEntryAuthFailed once reauth is implemented
raise ConfigEntryError("Error fetching device info: {err}") from err
except ConnectError as err:
raise UpdateFailed(f"Error fetching device info: {err}") from err
return EgaugeData(
measurements=measurements,
counters=counters,
register_info=self._register_info,
)

View File

@@ -1,35 +0,0 @@
"""Base entity for the eGauge integration."""
from __future__ import annotations
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, MANUFACTURER, MODEL
from .coordinator import EgaugeDataCoordinator
class EgaugeEntity(CoordinatorEntity[EgaugeDataCoordinator]):
"""Base entity for eGauge sensors."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: EgaugeDataCoordinator,
register_name: str,
) -> None:
"""Initialize the eGauge entity."""
super().__init__(coordinator)
register_identifier = f"{coordinator.serial_number}_{register_name}"
register_name = f"{coordinator.hostname} {register_name}"
# Device info using coordinator's cached data
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, register_identifier)},
name=register_name,
manufacturer=MANUFACTURER,
model=MODEL,
via_device=(DOMAIN, coordinator.serial_number),
)

View File

@@ -1,11 +0,0 @@
{
"domain": "egauge",
"name": "eGauge",
"codeowners": ["@neggert"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/egauge",
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "bronze",
"requirements": ["egauge-async==0.4.0"]
}

View File

@@ -1,74 +0,0 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions
appropriate-polling: done
brands: done
common-modules: done
config-flow: done
config-flow-test-coverage: done
dependency-transparency: done
docs-actions:
status: exempt
comment: Integration does not register custom actions
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Integration does not subscribe to events
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register actions
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: Integration does not expose configuration options
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: done
parallel-updates: todo
reauthentication-flow: todo
test-coverage: done
# Gold
devices: done
diagnostics: todo
discovery: todo
discovery-update-info: todo
docs-data-update: done
docs-examples: todo
docs-known-limitations: done
docs-supported-devices: done
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: todo
dynamic-devices: todo
entity-category: done
entity-device-class: done
entity-disabled-by-default:
status: exempt
comment: Integration only has essential entities
entity-translations: done
exception-translations: todo
icon-translations:
status: exempt
comment: Integration uses standard device class icons
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: done
inject-websession: done
strict-typing: todo

View File

@@ -1,99 +0,0 @@
"""Sensor platform for eGauge energy monitors."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from egauge_async.json.models import RegisterType
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import UnitOfEnergy, UnitOfPower
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import EgaugeConfigEntry, EgaugeData, EgaugeDataCoordinator
from .entity import EgaugeEntity
@dataclass(frozen=True, kw_only=True)
class EgaugeSensorEntityDescription(SensorEntityDescription):
"""Extended sensor description for eGauge sensors."""
native_value_fn: Callable[[EgaugeData, str], float]
available_fn: Callable[[EgaugeData, str], bool]
SENSORS: tuple[EgaugeSensorEntityDescription, ...] = (
EgaugeSensorEntityDescription(
key="power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
native_value_fn=lambda data, register: data.measurements[register],
available_fn=lambda data, register: register in data.measurements,
),
EgaugeSensorEntityDescription(
key="energy",
device_class=SensorDeviceClass.ENERGY,
state_class=SensorStateClass.TOTAL_INCREASING,
native_unit_of_measurement=UnitOfEnergy.JOULE,
suggested_unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
native_value_fn=lambda data, register: data.counters[register],
available_fn=lambda data, register: register in data.counters,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: EgaugeConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up eGauge sensor platform."""
coordinator = entry.runtime_data
async_add_entities(
EgaugeSensor(coordinator, register_name, sensor)
for sensor in SENSORS
for register_name, register_info in coordinator.data.register_info.items()
if register_info.type == RegisterType.POWER
)
class EgaugeSensor(EgaugeEntity, SensorEntity):
"""Generic sensor entity using entity description pattern."""
entity_description: EgaugeSensorEntityDescription
def __init__(
self,
coordinator: EgaugeDataCoordinator,
register_name: str,
description: EgaugeSensorEntityDescription,
) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, register_name)
self._register_name = register_name
self.entity_description = description
self._attr_unique_id = (
f"{coordinator.serial_number}_{register_name}_{description.key}"
)
@property
def native_value(self) -> float:
"""Return the sensor value using the description's value function."""
return self.entity_description.native_value_fn(
self.coordinator.data, self._register_name
)
@property
def available(self) -> bool:
"""Return true if the corresponding register is available."""
return super().available and self.entity_description.available_fn(
self.coordinator.data, self._register_name
)

View File

@@ -1,32 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"missing_permission": "The provided user does not have the necessary permissions",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"password": "[%key:common::config_flow::data::password%]",
"ssl": "[%key:common::config_flow::data::ssl%]",
"username": "[%key:common::config_flow::data::username%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"host": "The hostname or IP address of the eGauge device",
"password": "The password for the provided user.",
"ssl": "Use SSL for a secure connection.",
"username": "The username for the eGauge device. The user must have permission to read registers and settings.",
"verify_ssl": "Verify SSL certificate. eGauge devices use a self-signed certificate by default, so leave this off unless a custom certificate has been installed on the device."
}
}
}
}
}

View File

@@ -1,8 +1,8 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
"already_configured": "This device is already configured.",
"reauth_successful": "Reauthentication successful."
},
"create_entry": {
"add_sensor_mapping_hint": "You can now add mappings from any sensor in Home Assistant to {integration_name} using the '+ add sensor mapping' button."

View File

@@ -56,11 +56,11 @@ class EnergyZeroDataUpdateCoordinator(DataUpdateCoordinator[EnergyZeroData]):
energy_tomorrow = None
try:
energy_today = await self.energyzero.get_electricity_prices_legacy(
energy_today = await self.energyzero.energy_prices(
start_date=today, end_date=today
)
try:
gas_today = await self.energyzero.get_gas_prices_legacy(
gas_today = await self.energyzero.gas_prices(
start_date=today, end_date=today
)
except EnergyZeroNoDataError:
@@ -69,10 +69,8 @@ class EnergyZeroDataUpdateCoordinator(DataUpdateCoordinator[EnergyZeroData]):
if dt_util.utcnow().hour >= THRESHOLD_HOUR:
tomorrow = today + timedelta(days=1)
try:
energy_tomorrow = (
await self.energyzero.get_electricity_prices_legacy(
start_date=tomorrow, end_date=tomorrow
)
energy_tomorrow = await self.energyzero.energy_prices(
start_date=tomorrow, end_date=tomorrow
)
except EnergyZeroNoDataError:
LOGGER.debug("No data for tomorrow for EnergyZero integration")

View File

@@ -6,6 +6,6 @@
"documentation": "https://www.home-assistant.io/integrations/energyzero",
"integration_type": "service",
"iot_class": "cloud_polling",
"requirements": ["energyzero==4.0.1"],
"requirements": ["energyzero==2.1.1"],
"single_config_entry": true
}

View File

@@ -128,13 +128,13 @@ async def __get_prices(
data: Electricity | Gas
if price_type == PriceType.GAS:
data = await coordinator.energyzero.get_gas_prices_legacy(
data = await coordinator.energyzero.gas_prices(
start_date=start,
end_date=end,
vat=vat,
)
else:
data = await coordinator.energyzero.get_electricity_prices_legacy(
data = await coordinator.energyzero.energy_prices(
start_date=start,
end_date=end,
vat=vat,

View File

@@ -1,44 +0,0 @@
"""Constants for the Entur public transport integration."""
from datetime import timedelta
DOMAIN = "entur_public_transport"
API_CLIENT_NAME = "homeassistant-{}"
CONF_STOP_IDS = "stop_ids"
CONF_EXPAND_PLATFORMS = "expand_platforms"
CONF_WHITELIST_LINES = "line_whitelist"
CONF_OMIT_NON_BOARDING = "omit_non_boarding"
CONF_NUMBER_OF_DEPARTURES = "number_of_departures"
DEFAULT_NAME = "Entur"
DEFAULT_ICON_KEY = "bus"
ICONS = {
"air": "mdi:airplane",
"bus": "mdi:bus",
"metro": "mdi:subway",
"rail": "mdi:train",
"tram": "mdi:tram",
"water": "mdi:ferry",
}
SCAN_INTERVAL = timedelta(seconds=45)
ATTR_STOP_ID = "stop_id"
ATTR_ROUTE = "route"
ATTR_ROUTE_ID = "route_id"
ATTR_EXPECTED_AT = "due_at"
ATTR_DELAY = "delay"
ATTR_REALTIME = "real_time"
ATTR_NEXT_UP_IN = "next_due_in"
ATTR_NEXT_UP_ROUTE = "next_route"
ATTR_NEXT_UP_ROUTE_ID = "next_route_id"
ATTR_NEXT_UP_AT = "next_due_at"
ATTR_NEXT_UP_DELAY = "next_delay"
ATTR_NEXT_UP_REALTIME = "next_real_time"
ATTR_TRANSPORT_MODE = "transport_mode"

View File

@@ -1,9 +1,8 @@
{
"domain": "entur_public_transport",
"name": "Entur",
"codeowners": ["@hfurubotten", "@SanderBlom"],
"codeowners": ["@hfurubotten"],
"documentation": "https://www.home-assistant.io/integrations/entur_public_transport",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["enturclient"],
"quality_scale": "legacy",

View File

@@ -26,29 +26,27 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import Throttle, dt as dt_util
from .const import (
API_CLIENT_NAME,
ATTR_DELAY,
ATTR_EXPECTED_AT,
ATTR_NEXT_UP_AT,
ATTR_NEXT_UP_DELAY,
ATTR_NEXT_UP_IN,
ATTR_NEXT_UP_REALTIME,
ATTR_NEXT_UP_ROUTE,
ATTR_NEXT_UP_ROUTE_ID,
ATTR_REALTIME,
ATTR_ROUTE,
ATTR_ROUTE_ID,
ATTR_STOP_ID,
CONF_EXPAND_PLATFORMS,
CONF_NUMBER_OF_DEPARTURES,
CONF_OMIT_NON_BOARDING,
CONF_STOP_IDS,
CONF_WHITELIST_LINES,
DEFAULT_ICON_KEY,
DEFAULT_NAME,
ICONS,
)
API_CLIENT_NAME = "homeassistant-{}"
CONF_STOP_IDS = "stop_ids"
CONF_EXPAND_PLATFORMS = "expand_platforms"
CONF_WHITELIST_LINES = "line_whitelist"
CONF_OMIT_NON_BOARDING = "omit_non_boarding"
CONF_NUMBER_OF_DEPARTURES = "number_of_departures"
DEFAULT_NAME = "Entur"
DEFAULT_ICON_KEY = "bus"
ICONS = {
"air": "mdi:airplane",
"bus": "mdi:bus",
"metro": "mdi:subway",
"rail": "mdi:train",
"tram": "mdi:tram",
"water": "mdi:ferry",
}
SCAN_INTERVAL = timedelta(seconds=45)
PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
{
@@ -65,6 +63,24 @@ PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
)
ATTR_STOP_ID = "stop_id"
ATTR_ROUTE = "route"
ATTR_ROUTE_ID = "route_id"
ATTR_EXPECTED_AT = "due_at"
ATTR_DELAY = "delay"
ATTR_REALTIME = "real_time"
ATTR_NEXT_UP_IN = "next_due_in"
ATTR_NEXT_UP_ROUTE = "next_route"
ATTR_NEXT_UP_ROUTE_ID = "next_route_id"
ATTR_NEXT_UP_AT = "next_due_at"
ATTR_NEXT_UP_DELAY = "next_delay"
ATTR_NEXT_UP_REALTIME = "next_real_time"
ATTR_TRANSPORT_MODE = "transport_mode"
def due_in_minutes(timestamp: datetime) -> int:
"""Get the time in minutes from a timestamp."""
if timestamp is None:

View File

@@ -15,14 +15,12 @@ from aioesphomeapi import (
APIVersion,
DeviceInfo as EsphomeDeviceInfo,
EncryptionPlaintextAPIError,
ExecuteServiceResponse,
HomeassistantServiceCall,
InvalidAuthAPIError,
InvalidEncryptionKeyAPIError,
LogLevel,
ReconnectLogic,
RequiresEncryptionAPIError,
SupportsResponseType,
UserService,
UserServiceArgType,
ZWaveProxyRequest,
@@ -46,9 +44,7 @@ from homeassistant.core import (
EventStateChangedData,
HomeAssistant,
ServiceCall,
ServiceResponse,
State,
SupportsResponse,
callback,
)
from homeassistant.exceptions import (
@@ -62,7 +58,7 @@ from homeassistant.helpers import (
device_registry as dr,
entity_registry as er,
issue_registry as ir,
json as json_helper,
json,
template,
)
from homeassistant.helpers.device_registry import format_mac
@@ -74,7 +70,6 @@ from homeassistant.helpers.issue_registry import (
)
from homeassistant.helpers.service import async_set_service_schema
from homeassistant.helpers.template import Template
from homeassistant.util.json import json_loads_object
from .bluetooth import async_connect_scanner
from .const import (
@@ -96,7 +91,6 @@ from .encryption_key_storage import async_get_encryption_key_storage
# Import config flow so that it's added to the registry
from .entry_data import ESPHomeConfigEntry, RuntimeEntryData
from .enum_mapper import EsphomeEnumMapper
DEVICE_CONFLICT_ISSUE_FORMAT = "device_conflict-{}"
UNPACK_UINT32_BE = struct.Struct(">I").unpack_from
@@ -373,7 +367,7 @@ class ESPHomeManager:
response_dict = {"response": action_response}
# JSON encode response data for ESPHome
response_data = json_helper.json_bytes(response_dict)
response_data = json.json_bytes(response_dict)
except (
ServiceNotFound,
@@ -1156,52 +1150,13 @@ ARG_TYPE_METADATA = {
}
async def execute_service(
entry_data: RuntimeEntryData,
service: UserService,
call: ServiceCall,
*,
supports_response: SupportsResponseType,
) -> ServiceResponse:
"""Execute a service on a node and optionally wait for response."""
# Determine if we should wait for a response
# NONE: fire and forget
# OPTIONAL/ONLY/STATUS: always wait for success/error confirmation
wait_for_response = supports_response != SupportsResponseType.NONE
if not wait_for_response:
# Fire and forget - no response expected
try:
await entry_data.client.execute_service(service, call.data)
except APIConnectionError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="action_call_failed",
translation_placeholders={
"call_name": service.name,
"device_name": entry_data.name,
"error": str(err),
},
) from err
else:
return None
# Determine if we need response_data from ESPHome
# ONLY: always need response_data
# OPTIONAL: only if caller requested it
# STATUS: never need response_data (just success/error)
need_response_data = supports_response == SupportsResponseType.ONLY or (
supports_response == SupportsResponseType.OPTIONAL and call.return_response
)
@callback
def execute_service(
entry_data: RuntimeEntryData, service: UserService, call: ServiceCall
) -> None:
"""Execute a service on a node."""
try:
response: (
ExecuteServiceResponse | None
) = await entry_data.client.execute_service(
service,
call.data,
return_response=need_response_data,
)
entry_data.client.execute_service(service, call.data)
except APIConnectionError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
@@ -1212,44 +1167,6 @@ async def execute_service(
"error": str(err),
},
) from err
except TimeoutError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="action_call_timeout",
translation_placeholders={
"call_name": service.name,
"device_name": entry_data.name,
},
) from err
assert response is not None
if not response.success:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="action_call_failed",
translation_placeholders={
"call_name": service.name,
"device_name": entry_data.name,
"error": response.error_message,
},
)
# Parse and return response data as JSON if we requested it
if need_response_data and response.response_data:
try:
return json_loads_object(response.response_data)
except ValueError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="action_call_failed",
translation_placeholders={
"call_name": service.name,
"device_name": entry_data.name,
"error": f"Invalid JSON response: {err}",
},
) from err
return None
def build_service_name(device_info: EsphomeDeviceInfo, service: UserService) -> str:
@@ -1257,19 +1174,6 @@ def build_service_name(device_info: EsphomeDeviceInfo, service: UserService) ->
return f"{device_info.name.replace('-', '_')}_{service.name}"
# Map ESPHome SupportsResponseType to Home Assistant SupportsResponse
# STATUS (100) is ESPHome-specific: waits for success/error internally but
# doesn't return data to HA, so it maps to NONE from HA's perspective
_RESPONSE_TYPE_MAPPER = EsphomeEnumMapper[SupportsResponseType, SupportsResponse](
{
SupportsResponseType.NONE: SupportsResponse.NONE,
SupportsResponseType.OPTIONAL: SupportsResponse.OPTIONAL,
SupportsResponseType.ONLY: SupportsResponse.ONLY,
SupportsResponseType.STATUS: SupportsResponse.NONE,
}
)
@callback
def _async_register_service(
hass: HomeAssistant,
@@ -1301,21 +1205,11 @@ def _async_register_service(
"selector": metadata.selector,
}
# Get the supports_response from the service, defaulting to NONE
esphome_supports_response = service.supports_response or SupportsResponseType.NONE
ha_supports_response = _RESPONSE_TYPE_MAPPER.from_esphome(esphome_supports_response)
hass.services.async_register(
DOMAIN,
service_name,
partial(
execute_service,
entry_data,
service,
supports_response=esphome_supports_response,
),
partial(execute_service, entry_data, service),
vol.Schema(schema),
supports_response=ha_supports_response,
)
async_set_service_schema(
hass,

View File

@@ -17,7 +17,7 @@
"mqtt": ["esphome/discover/#"],
"quality_scale": "platinum",
"requirements": [
"aioesphomeapi==43.0.0",
"aioesphomeapi==42.10.0",
"esphome-dashboard-api==1.3.0",
"bleak-esphome==3.4.0"
],

View File

@@ -128,9 +128,6 @@
"action_call_failed": {
"message": "Failed to execute the action call {call_name} on {device_name}: {error}"
},
"action_call_timeout": {
"message": "Timeout waiting for response from action call {call_name} on {device_name}"
},
"error_communicating_with_device": {
"message": "Error communicating with the device {device_name}: {error}"
},

View File

@@ -23,5 +23,5 @@
"winter_mode": {}
},
"quality_scale": "internal",
"requirements": ["home-assistant-frontend==20251203.2"]
"requirements": ["home-assistant-frontend==20251203.1"]
}

View File

@@ -1,9 +1,9 @@
{
"preview_features": {
"winter_mode": {
"description": "Adds falling snowflakes on your screen. Get your home ready for winter! ❄️\n\nIf you have animations disabled in your device accessibility settings, this feature will not work.",
"disable_confirmation": "Snowflakes will no longer fall on your screen. You can re-enable this at any time in Labs settings.",
"enable_confirmation": "Snowflakes will start falling on your screen. You can turn this off at any time in Labs settings.",
"description": "Adds falling snowflakes on your screen. Get your home ready for winter! ❄️",
"disable_confirmation": "Snowflakes will no longer fall on your screen. You can re-enable this at any time in labs settings.",
"enable_confirmation": "Snowflakes will start falling on your screen. You can turn this off at any time in labs settings.",
"name": "Winter mode"
}
},

View File

@@ -10,7 +10,7 @@ from homeassistant.helpers import aiohttp_client, config_entry_oauth2_flow
from . import oauth2
from .const import DOMAIN
from .coordinator import HomeLinkConfigEntry, HomeLinkCoordinator
from .coordinator import HomeLinkConfigEntry, HomeLinkCoordinator, HomeLinkData
PLATFORMS: list[Platform] = [Platform.EVENT]
@@ -44,7 +44,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: HomeLinkConfigEntry) ->
)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
entry.runtime_data = HomeLinkData(
provider=provider, coordinator=coordinator, last_update_id=None
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
@@ -52,5 +54,5 @@ async def async_setup_entry(hass: HomeAssistant, entry: HomeLinkConfigEntry) ->
async def async_unload_entry(hass: HomeAssistant, entry: HomeLinkConfigEntry) -> bool:
"""Unload a config entry."""
await entry.runtime_data.async_on_unload(None)
await entry.runtime_data.coordinator.async_on_unload(None)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -7,7 +7,7 @@ import botocore.exceptions
from homelink.auth.srp_auth import SRPAuth
import voluptuous as vol
from homeassistant.config_entries import ConfigFlowResult
from homeassistant import config_entries
from homeassistant.const import CONF_EMAIL, CONF_PASSWORD
from homeassistant.helpers.config_entry_oauth2_flow import AbstractOAuth2FlowHandler
@@ -34,7 +34,7 @@ class SRPFlowHandler(AbstractOAuth2FlowHandler, domain=DOMAIN):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
) -> config_entries.ConfigFlowResult:
"""Ask for username and password."""
errors: dict[str, str] = {}
if user_input is not None:

View File

@@ -3,9 +3,10 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from functools import partial
import logging
from typing import TypedDict
from typing import TYPE_CHECKING, TypedDict
from homelink.model.device import Device
from homelink.mqtt_provider import MQTTProvider
@@ -14,12 +15,24 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.util.ssl import get_default_context
if TYPE_CHECKING:
from .event import HomeLinkEventEntity
_LOGGER = logging.getLogger(__name__)
type HomeLinkConfigEntry = ConfigEntry[HomeLinkCoordinator]
type HomeLinkConfigEntry = ConfigEntry[HomeLinkData]
type EventCallback = Callable[[HomeLinkEventData], None]
@dataclass
class HomeLinkData:
"""Class for HomeLink integration runtime data."""
provider: MQTTProvider
coordinator: HomeLinkCoordinator
last_update_id: str | None
class HomeLinkEventData(TypedDict):
"""Data for a single event."""
@@ -48,6 +61,7 @@ class HomeLinkCoordinator:
self.config_entry = config_entry
self.provider = provider
self.device_data: list[Device] = []
self.buttons: list[HomeLinkEventEntity] = []
self._listeners: dict[str, EventCallback] = {}
@callback
@@ -58,11 +72,11 @@ class HomeLinkCoordinator:
self._listeners[target_event_id] = update_callback
return partial(self.__async_remove_listener_internal, target_event_id)
def __async_remove_listener_internal(self, listener_id: str) -> None:
def __async_remove_listener_internal(self, listener_id: str):
del self._listeners[listener_id]
@callback
def async_handle_state_data(self, data: dict[str, HomeLinkEventData]) -> None:
def async_handle_state_data(self, data: dict[str, HomeLinkEventData]):
"""Notify listeners."""
for button_id, event in data.items():
if listener := self._listeners.get(button_id):
@@ -72,7 +86,7 @@ class HomeLinkCoordinator:
"""Refresh data for the first time when a config entry is setup."""
await self._async_setup()
async def async_on_unload(self, _event) -> None:
async def async_on_unload(self, _event):
"""Disconnect and unregister when unloaded."""
await self.provider.disable()
@@ -82,12 +96,14 @@ class HomeLinkCoordinator:
await self.discover_devices()
self.provider.listen(self.on_message)
async def discover_devices(self) -> None:
async def discover_devices(self):
"""Discover devices and build the Entities."""
self.device_data = await self.provider.discover()
def on_message(self, _topic: str, message: HomeLinkMQTTMessage) -> None:
"""MQTT Callback function."""
def on_message(
self: HomeLinkCoordinator, _topic: str, message: HomeLinkMQTTMessage
):
"MQTT Callback function."
if message["type"] == "state":
self.hass.add_job(self.async_handle_state_data, message["data"])
if message["type"] == "requestSync":

View File

@@ -3,27 +3,30 @@
from __future__ import annotations
from homeassistant.components.event import EventDeviceClass, EventEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN, EVENT_PRESSED
from .coordinator import HomeLinkConfigEntry, HomeLinkCoordinator, HomeLinkEventData
from .coordinator import HomeLinkCoordinator, HomeLinkEventData
async def async_setup_entry(
hass: HomeAssistant,
config_entry: HomeLinkConfigEntry,
config_entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Add the entities for the event platform."""
coordinator = config_entry.runtime_data
"""Add the entities for the binary sensor."""
coordinator = config_entry.runtime_data.coordinator
for device in coordinator.device_data:
buttons = [
HomeLinkEventEntity(b.id, b.name, device.id, device.name, coordinator)
for b in device.buttons
]
coordinator.buttons.extend(buttons)
async_add_entities(
HomeLinkEventEntity(coordinator, button.id, button.name, device.id, device.name)
for device in coordinator.device_data
for button in device.buttons
)
async_add_entities(coordinator.buttons)
# Updates are centralized by the coordinator.
@@ -39,17 +42,17 @@ class HomeLinkEventEntity(EventEntity):
def __init__(
self,
coordinator: HomeLinkCoordinator,
button_id: str,
id: str,
param_name: str,
device_id: str,
device_name: str,
coordinator: HomeLinkCoordinator,
) -> None:
"""Initialize the event entity."""
self.button_id = button_id
self._attr_name = param_name
self._attr_unique_id = button_id
self.id: str = id
self._attr_name: str = param_name
self._attr_unique_id: str = id
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, device_id)},
name=device_name,
@@ -62,7 +65,7 @@ class HomeLinkEventEntity(EventEntity):
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.async_add_event_listener(
self._handle_event_data_update, self.button_id
self._handle_event_data_update, self.id
)
)
@@ -73,4 +76,8 @@ class HomeLinkEventEntity(EventEntity):
if update_data["requestId"] != self.last_request_id:
self._trigger_event(EVENT_PRESSED)
self.last_request_id = update_data["requestId"]
self.async_write_ha_state()
self.async_write_ha_state()
async def async_update(self):
"""Request early polling. Left intentionally blank because it's not possible in this implementation."""

View File

@@ -21,7 +21,7 @@ _LOGGER = logging.getLogger(__name__)
class SRPAuthImplementation(config_entry_oauth2_flow.AbstractOAuth2Implementation):
"""Base class to abstract OAuth2 authentication."""
def __init__(self, hass: HomeAssistant, domain: str) -> None:
def __init__(self, hass: HomeAssistant, domain) -> None:
"""Initialize the SRP Auth implementation."""
self.hass = hass
@@ -45,13 +45,16 @@ class SRPAuthImplementation(config_entry_oauth2_flow.AbstractOAuth2Implementatio
async def async_resolve_external_data(self, external_data) -> dict:
"""Format the token from the source appropriately for HomeAssistant."""
tokens = external_data["tokens"]
return {
"access_token": tokens["AuthenticationResult"]["AccessToken"],
"refresh_token": tokens["AuthenticationResult"]["RefreshToken"],
"token_type": tokens["AuthenticationResult"]["TokenType"],
"expires_in": tokens["AuthenticationResult"]["ExpiresIn"],
"expires_at": (time.time() + tokens["AuthenticationResult"]["ExpiresIn"]),
}
new_token = {}
new_token["access_token"] = tokens["AuthenticationResult"]["AccessToken"]
new_token["refresh_token"] = tokens["AuthenticationResult"]["RefreshToken"]
new_token["token_type"] = tokens["AuthenticationResult"]["TokenType"]
new_token["expires_in"] = tokens["AuthenticationResult"]["ExpiresIn"]
new_token["expires_at"] = (
time.time() + tokens["AuthenticationResult"]["ExpiresIn"]
)
return new_token
async def _token_request(self, data: dict) -> dict:
"""Make a token request."""

View File

@@ -26,7 +26,7 @@ from homeassistant.const import (
CONF_NAME,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import SectionConfig, section
from homeassistant.data_entry_flow import section
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import LocationSelector, LocationSelectorConfig
@@ -38,8 +38,7 @@ STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_API_KEY): str,
vol.Optional(SECTION_API_KEY_OPTIONS): section(
vol.Schema({vol.Optional(CONF_REFERRER): str}),
SectionConfig(collapsed=True),
vol.Schema({vol.Optional(CONF_REFERRER): str}), {"collapsed": True}
),
}
)
@@ -52,9 +51,9 @@ async def _validate_input(
description_placeholders: dict[str, str],
) -> bool:
try:
await api.async_get_current_conditions(
await api.async_air_quality(
lat=user_input[CONF_LOCATION][CONF_LATITUDE],
lon=user_input[CONF_LOCATION][CONF_LONGITUDE],
long=user_input[CONF_LOCATION][CONF_LONGITUDE],
)
except GoogleAirQualityApiError as err:
errors["base"] = "cannot_connect"

View File

@@ -7,7 +7,7 @@ from typing import Final
from google_air_quality_api.api import GoogleAirQualityApi
from google_air_quality_api.exceptions import GoogleAirQualityApiError
from google_air_quality_api.model import AirQualityCurrentConditionsData
from google_air_quality_api.model import AirQualityData
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE
@@ -23,9 +23,7 @@ UPDATE_INTERVAL: Final = timedelta(hours=1)
type GoogleAirQualityConfigEntry = ConfigEntry[GoogleAirQualityRuntimeData]
class GoogleAirQualityUpdateCoordinator(
DataUpdateCoordinator[AirQualityCurrentConditionsData]
):
class GoogleAirQualityUpdateCoordinator(DataUpdateCoordinator[AirQualityData]):
"""Coordinator for fetching Google AirQuality data."""
config_entry: GoogleAirQualityConfigEntry
@@ -50,10 +48,10 @@ class GoogleAirQualityUpdateCoordinator(
self.lat = subentry.data[CONF_LATITUDE]
self.long = subentry.data[CONF_LONGITUDE]
async def _async_update_data(self) -> AirQualityCurrentConditionsData:
async def _async_update_data(self) -> AirQualityData:
"""Fetch air quality data for this coordinate."""
try:
return await self.client.async_get_current_conditions(self.lat, self.long)
return await self.client.async_air_quality(self.lat, self.long)
except GoogleAirQualityApiError as ex:
_LOGGER.debug("Cannot fetch air quality data: %s", str(ex))
raise UpdateFailed(

View File

@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"loggers": ["google_air_quality_api"],
"quality_scale": "bronze",
"requirements": ["google_air_quality_api==2.0.0"]
"requirements": ["google_air_quality_api==1.1.3"]
}

View File

@@ -4,7 +4,7 @@ from collections.abc import Callable
from dataclasses import dataclass
import logging
from google_air_quality_api.model import AirQualityCurrentConditionsData
from google_air_quality_api.model import AirQualityData
from homeassistant.components.sensor import (
SensorDeviceClass,
@@ -33,17 +33,15 @@ PARALLEL_UPDATES = 0
class AirQualitySensorEntityDescription(SensorEntityDescription):
"""Describes Air Quality sensor entity."""
exists_fn: Callable[[AirQualityCurrentConditionsData], bool] = lambda _: True
options_fn: Callable[[AirQualityCurrentConditionsData], list[str] | None] = (
exists_fn: Callable[[AirQualityData], bool] = lambda _: True
options_fn: Callable[[AirQualityData], list[str] | None] = lambda _: None
value_fn: Callable[[AirQualityData], StateType]
native_unit_of_measurement_fn: Callable[[AirQualityData], str | None] = (
lambda _: None
)
value_fn: Callable[[AirQualityCurrentConditionsData], StateType]
native_unit_of_measurement_fn: Callable[
[AirQualityCurrentConditionsData], str | None
] = lambda _: None
translation_placeholders_fn: (
Callable[[AirQualityCurrentConditionsData], dict[str, str]] | None
) = None
translation_placeholders_fn: Callable[[AirQualityData], dict[str, str]] | None = (
None
)
AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (

View File

@@ -59,4 +59,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: GoogleMailConfigEntry) -
async def async_unload_entry(hass: HomeAssistant, entry: GoogleMailConfigEntry) -> bool:
"""Unload a config entry."""
if not hass.config_entries.async_loaded_entries(DOMAIN):
for service_name in hass.services.async_services_for_domain(DOMAIN):
hass.services.async_remove(DOMAIN, service_name)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -6,5 +6,5 @@
"documentation": "https://www.home-assistant.io/integrations/hanna",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["hanna-cloud==0.0.7"]
"requirements": ["hanna-cloud==0.0.6"]
}

View File

@@ -34,7 +34,6 @@ ATTR_ISSUES = "issues"
ATTR_MESSAGE = "message"
ATTR_METHOD = "method"
ATTR_PANELS = "panels"
ATTR_PARAMS = "params"
ATTR_PASSWORD = "password"
ATTR_RESULT = "result"
ATTR_STARTUP = "startup"

View File

@@ -198,7 +198,6 @@ class HassIO:
timeout: int | None = 10,
return_text: bool = False,
*,
params: dict[str, Any] | None = None,
source: str = "core.handler",
) -> Any:
"""Send API command to Hass.io.
@@ -219,7 +218,6 @@ class HassIO:
response = await self.websession.request(
method,
joined_url,
params=params,
json=payload,
headers={
aiohttp.hdrs.AUTHORIZATION: (

View File

@@ -24,7 +24,6 @@ from .const import (
ATTR_DATA,
ATTR_ENDPOINT,
ATTR_METHOD,
ATTR_PARAMS,
ATTR_SESSION_DATA_USER_ID,
ATTR_SLUG,
ATTR_TIMEOUT,
@@ -112,7 +111,6 @@ def websocket_supervisor_event(
vol.Required(ATTR_ENDPOINT): cv.string,
vol.Required(ATTR_METHOD): cv.string,
vol.Optional(ATTR_DATA): dict,
vol.Optional(ATTR_PARAMS): dict,
vol.Optional(ATTR_TIMEOUT): vol.Any(Number, None),
}
)
@@ -142,7 +140,6 @@ async def websocket_supervisor_api(
timeout=msg.get(ATTR_TIMEOUT, 10),
payload=payload,
source="core.websocket_api",
params=msg.get(ATTR_PARAMS),
)
except HassioAPIError as err:
_LOGGER.error("Failed to to call %s - %s", msg[ATTR_ENDPOINT], err)

View File

@@ -6,5 +6,5 @@
"iot_class": "local_polling",
"loggers": ["heatmiserV3"],
"quality_scale": "legacy",
"requirements": ["heatmiserV3==2.0.4"]
"requirements": ["heatmiserV3==2.0.3"]
}

View File

@@ -2,7 +2,7 @@
import logging
from HueBLE import ConnectionError, HueBleError, HueBleLight
from HueBLE import HueBleLight
from homeassistant.components.bluetooth import (
async_ble_device_from_address,
@@ -38,15 +38,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: HueBLEConfigEntry) -> bo
light = HueBleLight(ble_device)
try:
await light.connect()
await light.poll_state()
except ConnectionError as e:
raise ConfigEntryNotReady("Device found but unable to connect.") from e
except HueBleError as e:
raise ConfigEntryNotReady(
"Device found and connected but unable to poll values from it."
) from e
if not await light.connect() or not await light.poll_state():
raise ConfigEntryNotReady("Device found but unable to connect.")
entry.runtime_data = light

View File

@@ -6,7 +6,7 @@ from enum import Enum
import logging
from typing import Any
from HueBLE import ConnectionError, HueBleError, HueBleLight, PairingError
from HueBLE import HueBleLight
import voluptuous as vol
from homeassistant.components import bluetooth
@@ -20,7 +20,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
from .const import DOMAIN, URL_FACTORY_RESET, URL_PAIRING_MODE
from .const import DOMAIN, URL_PAIRING_MODE
from .light import get_available_color_modes
_LOGGER = logging.getLogger(__name__)
@@ -41,22 +41,32 @@ async def validate_input(hass: HomeAssistant, address: str) -> Error | None:
try:
light = HueBleLight(ble_device)
await light.connect()
get_available_color_modes(light)
await light.poll_state()
except ConnectionError as e:
_LOGGER.exception("Error connecting to light")
return (
Error.INVALID_AUTH
if type(e.__cause__) is PairingError
else Error.CANNOT_CONNECT
)
except HueBleError:
await light.connect()
if light.authenticated is None:
_LOGGER.warning(
"Unable to determine if light authenticated, proceeding anyway"
)
elif not light.authenticated:
return Error.INVALID_AUTH
if not light.connected:
return Error.CANNOT_CONNECT
try:
get_available_color_modes(light)
except HomeAssistantError:
return Error.NOT_SUPPORTED
_, errors = await light.poll_state()
if len(errors) != 0:
_LOGGER.warning("Errors raised when connecting to light: %s", errors)
return Error.CANNOT_CONNECT
except Exception:
_LOGGER.exception("Unexpected error validating light connection")
return Error.UNKNOWN
except HomeAssistantError:
return Error.NOT_SUPPORTED
else:
return None
finally:
@@ -119,7 +129,6 @@ class HueBleConfigFlow(ConfigFlow, domain=DOMAIN):
CONF_NAME: self._discovery_info.name,
CONF_MAC: self._discovery_info.address,
"url_pairing_mode": URL_PAIRING_MODE,
"url_factory_reset": URL_FACTORY_RESET,
},
)

View File

@@ -2,4 +2,3 @@
DOMAIN = "hue_ble"
URL_PAIRING_MODE = "https://www.home-assistant.io/integrations/hue_ble#initial-setup"
URL_FACTORY_RESET = "https://www.philips-hue.com/en-gb/support/article/how-to-factory-reset-philips-hue-lights/000004"

View File

@@ -113,7 +113,7 @@ class HueBLELight(LightEntity):
async def async_update(self) -> None:
"""Fetch latest state from light and make available via properties."""
await self._api.poll_state()
await self._api.poll_state(run_callbacks=True)
async def async_turn_on(self, **kwargs: Any) -> None:
"""Set properties then turn the light on."""

View File

@@ -15,5 +15,5 @@
"iot_class": "local_push",
"loggers": ["bleak", "HueBLE"],
"quality_scale": "bronze",
"requirements": ["HueBLE==2.1.0"]
"requirements": ["HueBLE==1.0.8"]
}

View File

@@ -14,7 +14,7 @@
},
"step": {
"confirm": {
"description": "Do you want to set up {name} ({mac})?. Make sure the light is [made discoverable to voice assistants]({url_pairing_mode}) or has been [factory reset]({url_factory_reset})."
"description": "Do you want to set up {name} ({mac})?. Make sure the light is [made discoverable to voice assistants]({url_pairing_mode})."
}
}
}

View File

@@ -29,8 +29,7 @@ from .const import (
DEFAULT_LANGUAGE,
DOMAIN,
)
from .coordinator import JewishCalendarData, JewishCalendarUpdateCoordinator
from .entity import JewishCalendarConfigEntry
from .entity import JewishCalendarConfigEntry, JewishCalendarData
from .services import async_setup_services
_LOGGER = logging.getLogger(__name__)
@@ -70,7 +69,7 @@ async def async_setup_entry(
)
)
data = JewishCalendarData(
config_entry.runtime_data = JewishCalendarData(
language,
diaspora,
location,
@@ -78,11 +77,8 @@ async def async_setup_entry(
havdalah_offset,
)
coordinator = JewishCalendarUpdateCoordinator(hass, config_entry, data)
await coordinator.async_config_entry_first_refresh()
config_entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
return True
@@ -90,12 +86,7 @@ async def async_unload_entry(
hass: HomeAssistant, config_entry: JewishCalendarConfigEntry
) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
):
coordinator = config_entry.runtime_data
await coordinator.async_shutdown()
return unload_ok
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
async def async_migrate_entry(

View File

@@ -72,7 +72,8 @@ class JewishCalendarBinarySensor(JewishCalendarEntity, BinarySensorEntity):
@property
def is_on(self) -> bool:
"""Return true if sensor is on."""
return self.entity_description.is_on(self.coordinator.zmanim)(dt_util.now())
zmanim = self.make_zmanim(dt.date.today())
return self.entity_description.is_on(zmanim)(dt_util.now())
def _update_times(self, zmanim: Zmanim) -> list[dt.datetime | None]:
"""Return a list of times to update the sensor."""

View File

@@ -1,105 +0,0 @@
"""Data update coordinator for Jewish calendar."""
from dataclasses import dataclass
import datetime as dt
import logging
from hdate import HDateInfo, Location, Zmanim
from hdate.translator import Language, set_language
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import event
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
import homeassistant.util.dt as dt_util
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
type JewishCalendarConfigEntry = ConfigEntry[JewishCalendarUpdateCoordinator]
@dataclass(frozen=True)
class JewishCalendarData:
"""Jewish Calendar runtime dataclass."""
language: Language
diaspora: bool
location: Location
candle_lighting_offset: int
havdalah_offset: int
dateinfo: HDateInfo | None = None
zmanim: Zmanim | None = None
class JewishCalendarUpdateCoordinator(DataUpdateCoordinator[JewishCalendarData]):
"""Data update coordinator class for Jewish calendar."""
config_entry: JewishCalendarConfigEntry
_attr_should_poll = False
def __init__(
self,
hass: HomeAssistant,
config_entry: JewishCalendarConfigEntry,
data: JewishCalendarData,
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, _LOGGER, name=DOMAIN, config_entry=config_entry)
self.data = data
set_language(data.language)
async def _async_update_data(self) -> JewishCalendarData:
"""Return HDate and Zmanim for today."""
now = dt_util.now()
_LOGGER.debug("Now: %s Location: %r", now, self.data.location)
today = now.date()
# Create new data object with today's information
new_data = JewishCalendarData(
language=self.data.language,
diaspora=self.data.diaspora,
location=self.data.location,
candle_lighting_offset=self.data.candle_lighting_offset,
havdalah_offset=self.data.havdalah_offset,
dateinfo=HDateInfo(today, self.data.diaspora),
zmanim=self.make_zmanim(today),
)
# Schedule next update at midnight
next_midnight = dt_util.start_of_local_day() + dt.timedelta(days=1)
_LOGGER.debug("Scheduling next update at %s", next_midnight)
# Schedule update at next midnight
self._unsub_refresh = event.async_track_point_in_time(
self.hass, self._handle_midnight_update, next_midnight
)
return new_data
@callback
def _handle_midnight_update(self, _now: dt.datetime) -> None:
"""Handle midnight update callback."""
self.hass.async_create_task(self.async_request_refresh())
def make_zmanim(self, date: dt.date) -> Zmanim:
"""Create a Zmanim object."""
return Zmanim(
date=date,
location=self.data.location,
candle_lighting_offset=self.data.candle_lighting_offset,
havdalah_offset=self.data.havdalah_offset,
)
@property
def zmanim(self) -> Zmanim:
"""Return the current Zmanim."""
assert self.data.zmanim is not None, "Zmanim data not available"
return self.data.zmanim
@property
def dateinfo(self) -> HDateInfo:
"""Return the current HDateInfo."""
assert self.data.dateinfo is not None, "HDateInfo data not available"
return self.data.dateinfo

View File

@@ -24,5 +24,5 @@ async def async_get_config_entry_diagnostics(
return {
"entry_data": async_redact_data(entry.data, TO_REDACT),
"data": async_redact_data(asdict(entry.runtime_data.data), TO_REDACT),
"data": async_redact_data(asdict(entry.runtime_data), TO_REDACT),
}

View File

@@ -1,22 +1,48 @@
"""Entity representing a Jewish Calendar sensor."""
from abc import abstractmethod
from dataclasses import dataclass
import datetime as dt
import logging
from hdate import Zmanim
from hdate import HDateInfo, Location, Zmanim
from hdate.translator import Language, set_language
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, callback
from homeassistant.helpers import event
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity import EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.entity import Entity, EntityDescription
from homeassistant.util import dt as dt_util
from .const import DOMAIN
from .coordinator import JewishCalendarConfigEntry, JewishCalendarUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
type JewishCalendarConfigEntry = ConfigEntry[JewishCalendarData]
class JewishCalendarEntity(CoordinatorEntity[JewishCalendarUpdateCoordinator]):
@dataclass
class JewishCalendarDataResults:
"""Jewish Calendar results dataclass."""
dateinfo: HDateInfo
zmanim: Zmanim
@dataclass
class JewishCalendarData:
"""Jewish Calendar runtime dataclass."""
language: Language
diaspora: bool
location: Location
candle_lighting_offset: int
havdalah_offset: int
results: JewishCalendarDataResults | None = None
class JewishCalendarEntity(Entity):
"""An HA implementation for Jewish Calendar entity."""
_attr_has_entity_name = True
@@ -29,13 +55,23 @@ class JewishCalendarEntity(CoordinatorEntity[JewishCalendarUpdateCoordinator]):
description: EntityDescription,
) -> None:
"""Initialize a Jewish Calendar entity."""
super().__init__(config_entry.runtime_data)
self.entity_description = description
self._attr_unique_id = f"{config_entry.entry_id}-{description.key}"
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, config_entry.entry_id)},
)
self.data = config_entry.runtime_data
set_language(self.data.language)
def make_zmanim(self, date: dt.date) -> Zmanim:
"""Create a Zmanim object."""
return Zmanim(
date=date,
location=self.data.location,
candle_lighting_offset=self.data.candle_lighting_offset,
havdalah_offset=self.data.havdalah_offset,
)
async def async_added_to_hass(self) -> None:
"""Call when entity is added to hass."""
@@ -49,14 +85,6 @@ class JewishCalendarEntity(CoordinatorEntity[JewishCalendarUpdateCoordinator]):
self._update_unsub = None
return await super().async_will_remove_from_hass()
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
# When coordinator updates (e.g., from tests forcing refresh or midnight update),
# reschedule our entity-specific updates
self._schedule_update()
super()._handle_coordinator_update()
@abstractmethod
def _update_times(self, zmanim: Zmanim) -> list[dt.datetime | None]:
"""Return a list of times to update the sensor."""
@@ -64,9 +92,10 @@ class JewishCalendarEntity(CoordinatorEntity[JewishCalendarUpdateCoordinator]):
def _schedule_update(self) -> None:
"""Schedule the next update of the sensor."""
now = dt_util.now()
zmanim = self.make_zmanim(now.date())
update = dt_util.start_of_local_day() + dt.timedelta(days=1)
for update_time in self._update_times(self.coordinator.zmanim):
for update_time in self._update_times(zmanim):
if update_time is not None and now < update_time < update:
update = update_time
@@ -81,4 +110,17 @@ class JewishCalendarEntity(CoordinatorEntity[JewishCalendarUpdateCoordinator]):
"""Update the sensor data."""
self._update_unsub = None
self._schedule_update()
self.create_results(now)
self.async_write_ha_state()
def create_results(self, now: dt.datetime | None = None) -> None:
"""Create the results for the sensor."""
if now is None:
now = dt_util.now()
_LOGGER.debug("Now: %s Location: %r", now, self.data.location)
today = now.date()
zmanim = self.make_zmanim(today)
dateinfo = HDateInfo(today, diaspora=self.data.diaspora)
self.data.results = JewishCalendarDataResults(dateinfo, zmanim)

View File

@@ -19,7 +19,7 @@ from homeassistant.components.sensor import (
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
import homeassistant.util.dt as dt_util
from homeassistant.util import dt as dt_util
from .entity import JewishCalendarConfigEntry, JewishCalendarEntity
@@ -238,18 +238,25 @@ class JewishCalendarBaseSensor(JewishCalendarEntity, SensorEntity):
return []
return [self.entity_description.next_update_fn(zmanim)]
def get_dateinfo(self) -> HDateInfo:
def get_dateinfo(self, now: dt.datetime | None = None) -> HDateInfo:
"""Get the next date info."""
now = dt_util.now()
if self.data.results is None:
self.create_results()
assert self.data.results is not None, "Results should be available"
if now is None:
now = dt_util.now()
today = now.date()
zmanim = self.make_zmanim(today)
update = None
if self.entity_description.next_update_fn:
update = self.entity_description.next_update_fn(self.coordinator.zmanim)
update = self.entity_description.next_update_fn(zmanim)
_LOGGER.debug("Today: %s, update: %s", now.date(), update)
_LOGGER.debug("Today: %s, update: %s", today, update)
if update is not None and now >= update:
return self.coordinator.dateinfo.next_day
return self.coordinator.dateinfo
return self.data.results.dateinfo.next_day
return self.data.results.dateinfo
class JewishCalendarSensor(JewishCalendarBaseSensor):
@@ -266,9 +273,7 @@ class JewishCalendarSensor(JewishCalendarBaseSensor):
super().__init__(config_entry, description)
# Set the options for enumeration sensors
if self.entity_description.options_fn is not None:
self._attr_options = self.entity_description.options_fn(
self.coordinator.data.diaspora
)
self._attr_options = self.entity_description.options_fn(self.data.diaspora)
@property
def native_value(self) -> str | int | dt.datetime | None:
@@ -292,8 +297,9 @@ class JewishCalendarTimeSensor(JewishCalendarBaseSensor):
@property
def native_value(self) -> dt.datetime | None:
"""Return the state of the sensor."""
if self.data.results is None:
self.create_results()
assert self.data.results is not None, "Results should be available"
if self.entity_description.value_fn is None:
return self.coordinator.zmanim.zmanim[self.entity_description.key].local
return self.entity_description.value_fn(
self.get_dateinfo(), self.coordinator.make_zmanim
)
return self.data.results.zmanim.zmanim[self.entity_description.key].local
return self.entity_description.value_fn(self.get_dateinfo(), self.make_zmanim)

View File

@@ -5,7 +5,7 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/kaleidescape",
"iot_class": "local_push",
"requirements": ["pykaleidescape==1.0.2"],
"requirements": ["pykaleidescape==1.0.1"],
"ssdp": [
{
"deviceType": "schemas-upnp-org:device:Basic:1",

View File

@@ -165,6 +165,7 @@ SUPPORTED_PLATFORMS_UI: Final = {
Platform.DATE,
Platform.DATETIME,
Platform.LIGHT,
Platform.SENSOR,
Platform.SWITCH,
Platform.TIME,
}

View File

@@ -0,0 +1,142 @@
"""KNX DPT serializer."""
from collections.abc import Mapping
from functools import cache
from typing import Literal, TypedDict
from xknx.dpt import DPTBase, DPTComplex, DPTEnum, DPTNumeric
from xknx.dpt.dpt_16 import DPTString
from homeassistant.components.sensor import SensorDeviceClass, SensorStateClass
HaDptClass = Literal["numeric", "enum", "complex", "string"]
class DPTInfo(TypedDict):
"""DPT information."""
dpt_class: HaDptClass
main: int
sub: int | None
name: str | None
unit: str | None
sensor_device_class: SensorDeviceClass | None
sensor_state_class: SensorStateClass | None
@cache
def get_supported_dpts() -> Mapping[str, DPTInfo]:
"""Return a mapping of supported DPTs with HA specific attributes."""
dpts = {}
for dpt_class in DPTBase.dpt_class_tree():
dpt_number_str = dpt_class.dpt_number_str()
ha_dpt_class = _ha_dpt_class(dpt_class)
dpts[dpt_number_str] = DPTInfo(
dpt_class=ha_dpt_class,
main=dpt_class.dpt_main_number, # type: ignore[typeddict-item] # checked in xknx unit tests
sub=dpt_class.dpt_sub_number,
name=dpt_class.value_type,
unit=dpt_class.unit,
sensor_device_class=_sensor_device_classes.get(dpt_number_str),
sensor_state_class=_get_sensor_state_class(ha_dpt_class, dpt_number_str),
)
return dpts
def _ha_dpt_class(dpt_cls: type[DPTBase]) -> HaDptClass:
"""Return the DPT class identifier string."""
if issubclass(dpt_cls, DPTNumeric):
return "numeric"
if issubclass(dpt_cls, DPTEnum):
return "enum"
if issubclass(dpt_cls, DPTComplex):
return "complex"
if issubclass(dpt_cls, DPTString):
return "string"
raise ValueError("Unsupported DPT class")
_sensor_device_classes: Mapping[str, SensorDeviceClass] = {
"7.011": SensorDeviceClass.DISTANCE,
"7.012": SensorDeviceClass.CURRENT,
"7.013": SensorDeviceClass.ILLUMINANCE,
"8.012": SensorDeviceClass.DISTANCE,
"9.001": SensorDeviceClass.TEMPERATURE,
"9.002": SensorDeviceClass.TEMPERATURE_DELTA,
"9.004": SensorDeviceClass.ILLUMINANCE,
"9.005": SensorDeviceClass.WIND_SPEED,
"9.006": SensorDeviceClass.PRESSURE,
"9.007": SensorDeviceClass.HUMIDITY,
"9.020": SensorDeviceClass.VOLTAGE,
"9.021": SensorDeviceClass.CURRENT,
"9.024": SensorDeviceClass.POWER,
"9.025": SensorDeviceClass.VOLUME_FLOW_RATE,
"9.027": SensorDeviceClass.TEMPERATURE,
"9.028": SensorDeviceClass.WIND_SPEED,
"9.029": SensorDeviceClass.ABSOLUTE_HUMIDITY,
"12.1200": SensorDeviceClass.VOLUME,
"12.1201": SensorDeviceClass.VOLUME,
"13.002": SensorDeviceClass.VOLUME_FLOW_RATE,
"13.010": SensorDeviceClass.ENERGY,
"13.012": SensorDeviceClass.REACTIVE_ENERGY,
"13.013": SensorDeviceClass.ENERGY,
"13.015": SensorDeviceClass.REACTIVE_ENERGY,
"13.016": SensorDeviceClass.ENERGY,
"14.010": SensorDeviceClass.AREA,
"14.019": SensorDeviceClass.CURRENT,
"14.027": SensorDeviceClass.VOLTAGE,
"14.028": SensorDeviceClass.VOLTAGE,
"14.030": SensorDeviceClass.VOLTAGE,
"14.031": SensorDeviceClass.ENERGY,
"14.033": SensorDeviceClass.FREQUENCY,
"14.037": SensorDeviceClass.ENERGY_STORAGE,
"14.039": SensorDeviceClass.DISTANCE,
"14.051": SensorDeviceClass.WEIGHT,
"14.056": SensorDeviceClass.POWER,
"14.057": SensorDeviceClass.POWER_FACTOR,
"14.058": SensorDeviceClass.PRESSURE,
"14.065": SensorDeviceClass.SPEED,
"14.068": SensorDeviceClass.TEMPERATURE,
"14.069": SensorDeviceClass.TEMPERATURE,
"14.070": SensorDeviceClass.TEMPERATURE_DELTA,
"14.076": SensorDeviceClass.VOLUME,
"14.077": SensorDeviceClass.VOLUME_FLOW_RATE,
"14.080": SensorDeviceClass.APPARENT_POWER,
"29.010": SensorDeviceClass.ENERGY,
"29.012": SensorDeviceClass.REACTIVE_ENERGY,
}
_sensor_state_class_overrides: Mapping[str, SensorStateClass | None] = {
"5.003": SensorStateClass.MEASUREMENT_ANGLE, # DPTAngle
"5.006": None, # DPTTariff
"7.010": None, # DPTPropDataType
"8.011": SensorStateClass.MEASUREMENT_ANGLE, # DPTRotationAngle
"9.026": SensorStateClass.TOTAL_INCREASING, # DPTRainAmount
"12.1200": SensorStateClass.TOTAL, # DPTVolumeLiquidLitre
"12.1201": SensorStateClass.TOTAL, # DPTVolumeM3
"13.010": SensorStateClass.TOTAL, # DPTActiveEnergy
"13.011": SensorStateClass.TOTAL, # DPTApparantEnergy
"13.012": SensorStateClass.TOTAL, # DPTReactiveEnergy
"14.007": SensorStateClass.MEASUREMENT_ANGLE, # DPTAngleDeg
"14.037": SensorStateClass.TOTAL, # DPTHeatQuantity
"14.051": SensorStateClass.TOTAL, # DPTMass
"14.055": SensorStateClass.MEASUREMENT_ANGLE, # DPTPhaseAngleDeg
"14.031": SensorStateClass.TOTAL_INCREASING, # DPTEnergy
"17.001": None, # DPTSceneNumber
"29.010": SensorStateClass.TOTAL, # DPTActiveEnergy8Byte
"29.011": SensorStateClass.TOTAL, # DPTApparantEnergy8Byte
"29.012": SensorStateClass.TOTAL, # DPTReactiveEnergy8Byte
}
def _get_sensor_state_class(
ha_dpt_class: HaDptClass, dpt_number_str: str
) -> SensorStateClass | None:
"""Return the SensorStateClass for a given DPT."""
if ha_dpt_class != "numeric":
return None
return _sensor_state_class_overrides.get(
dpt_number_str,
SensorStateClass.MEASUREMENT,
)

View File

@@ -21,9 +21,6 @@
"telegram_count": {
"default": "mdi:plus-network"
},
"telegrams_data_secure_undecodable": {
"default": "mdi:lock-alert"
},
"telegrams_incoming": {
"default": "mdi:upload-network"
},

View File

@@ -6,8 +6,8 @@ from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import partial
from typing import Any
from xknx import XKNX
from xknx.core.connection_state import XknxConnectionState, XknxConnectionType
from xknx.devices import Device as XknxDevice, Sensor as XknxSensor
@@ -25,20 +25,32 @@ from homeassistant.const import (
CONF_ENTITY_CATEGORY,
CONF_NAME,
CONF_TYPE,
CONF_UNIT_OF_MEASUREMENT,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
EntityCategory,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
async_get_current_platform,
)
from homeassistant.helpers.typing import ConfigType, StateType
from homeassistant.util.enum import try_parse_enum
from .const import ATTR_SOURCE, KNX_MODULE_KEY
from .entity import KnxYamlEntity
from .const import ATTR_SOURCE, CONF_SYNC_STATE, DOMAIN, KNX_MODULE_KEY
from .dpt import get_supported_dpts
from .entity import (
KnxUiEntity,
KnxUiEntityPlatformController,
KnxYamlEntity,
_KnxEntityBase,
)
from .knx_module import KNXModule
from .schema import SensorSchema
from .storage.const import CONF_ALWAYS_CALLBACK, CONF_ENTITY, CONF_GA_SENSOR
from .storage.util import ConfigExtractor
SCAN_INTERVAL = timedelta(seconds=10)
@@ -108,12 +120,6 @@ SYSTEM_ENTITY_DESCRIPTIONS = (
+ knx.xknx.connection_manager.cemi_count_incoming
+ knx.xknx.connection_manager.cemi_count_incoming_error,
),
KNXSystemEntityDescription(
key="telegrams_data_secure_undecodable",
entity_registry_enabled_default=False,
state_class=SensorStateClass.TOTAL_INCREASING,
value_fn=lambda knx: knx.xknx.connection_manager.undecoded_data_secure,
),
)
@@ -122,58 +128,41 @@ async def async_setup_entry(
config_entry: config_entries.ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up sensor(s) for KNX platform."""
"""Set up entities for KNX platform."""
knx_module = hass.data[KNX_MODULE_KEY]
platform = async_get_current_platform()
knx_module.config_store.add_platform(
platform=Platform.SENSOR,
controller=KnxUiEntityPlatformController(
knx_module=knx_module,
entity_platform=platform,
entity_class=KnxUiSensor,
),
)
entities: list[SensorEntity] = []
entities.extend(
KNXSystemSensor(knx_module, description)
for description in SYSTEM_ENTITY_DESCRIPTIONS
)
config: list[ConfigType] | None = knx_module.config_yaml.get(Platform.SENSOR)
if config:
if yaml_platform_config := knx_module.config_yaml.get(Platform.SENSOR):
entities.extend(
KNXSensor(knx_module, entity_config) for entity_config in config
KnxYamlSensor(knx_module, entity_config)
for entity_config in yaml_platform_config
)
if ui_config := knx_module.config_store.data["entities"].get(Platform.SENSOR):
entities.extend(
KnxUiSensor(knx_module, unique_id, config)
for unique_id, config in ui_config.items()
)
async_add_entities(entities)
def _create_sensor(xknx: XKNX, config: ConfigType) -> XknxSensor:
"""Return a KNX sensor to be used within XKNX."""
return XknxSensor(
xknx,
name=config[CONF_NAME],
group_address_state=config[SensorSchema.CONF_STATE_ADDRESS],
sync_state=config[SensorSchema.CONF_SYNC_STATE],
always_callback=True,
value_type=config[CONF_TYPE],
)
class KNXSensor(KnxYamlEntity, RestoreSensor):
class _KnxSensor(RestoreSensor, _KnxEntityBase):
"""Representation of a KNX sensor."""
_device: XknxSensor
def __init__(self, knx_module: KNXModule, config: ConfigType) -> None:
"""Initialize of a KNX sensor."""
super().__init__(
knx_module=knx_module,
device=_create_sensor(knx_module.xknx, config),
)
if device_class := config.get(CONF_DEVICE_CLASS):
self._attr_device_class = device_class
else:
self._attr_device_class = try_parse_enum(
SensorDeviceClass, self._device.ha_device_class()
)
self._attr_force_update = config[SensorSchema.CONF_ALWAYS_CALLBACK]
self._attr_entity_category = config.get(CONF_ENTITY_CATEGORY)
self._attr_unique_id = str(self._device.sensor_value.group_address_state)
self._attr_native_unit_of_measurement = self._device.unit_of_measurement()
self._attr_state_class = config.get(CONF_STATE_CLASS)
self._attr_extra_state_attributes = {}
async def async_added_to_hass(self) -> None:
"""Restore last state."""
if (
@@ -198,6 +187,89 @@ class KNXSensor(KnxYamlEntity, RestoreSensor):
super().after_update_callback(device)
class KnxYamlSensor(_KnxSensor, KnxYamlEntity):
"""Representation of a KNX sensor configured from YAML."""
_device: XknxSensor
def __init__(self, knx_module: KNXModule, config: ConfigType) -> None:
"""Initialize of a KNX sensor."""
super().__init__(
knx_module=knx_module,
device=XknxSensor(
knx_module.xknx,
name=config[CONF_NAME],
group_address_state=config[SensorSchema.CONF_STATE_ADDRESS],
sync_state=config[CONF_SYNC_STATE],
always_callback=True,
value_type=config[CONF_TYPE],
),
)
if device_class := config.get(CONF_DEVICE_CLASS):
self._attr_device_class = device_class
else:
self._attr_device_class = try_parse_enum(
SensorDeviceClass, self._device.ha_device_class()
)
self._attr_force_update = config[SensorSchema.CONF_ALWAYS_CALLBACK]
self._attr_entity_category = config.get(CONF_ENTITY_CATEGORY)
self._attr_unique_id = str(self._device.sensor_value.group_address_state)
self._attr_native_unit_of_measurement = self._device.unit_of_measurement()
self._attr_state_class = config.get(CONF_STATE_CLASS)
self._attr_extra_state_attributes = {}
class KnxUiSensor(_KnxSensor, KnxUiEntity):
"""Representation of a KNX sensor configured from the UI."""
_device: XknxSensor
def __init__(
self, knx_module: KNXModule, unique_id: str, config: dict[str, Any]
) -> None:
"""Initialize KNX sensor."""
super().__init__(
knx_module=knx_module,
unique_id=unique_id,
entity_config=config[CONF_ENTITY],
)
knx_conf = ConfigExtractor(config[DOMAIN])
dpt_string = knx_conf.get_dpt(CONF_GA_SENSOR)
assert dpt_string is not None # required for sensor
dpt_info = get_supported_dpts()[dpt_string]
self._device = XknxSensor(
knx_module.xknx,
name=config[CONF_ENTITY][CONF_NAME],
group_address_state=knx_conf.get_state_and_passive(CONF_GA_SENSOR),
sync_state=knx_conf.get(CONF_SYNC_STATE),
always_callback=True,
value_type=dpt_string,
)
if device_class_override := knx_conf.get(CONF_DEVICE_CLASS):
self._attr_device_class = try_parse_enum(
SensorDeviceClass, device_class_override
)
else:
self._attr_device_class = dpt_info["sensor_device_class"]
if state_class_override := knx_conf.get(CONF_STATE_CLASS):
self._attr_state_class = try_parse_enum(
SensorStateClass, state_class_override
)
else:
self._attr_state_class = dpt_info["sensor_state_class"]
self._attr_native_unit_of_measurement = (
knx_conf.get(CONF_UNIT_OF_MEASUREMENT) or dpt_info["unit"]
)
self._attr_force_update = knx_conf.get(CONF_ALWAYS_CALLBACK, default=False)
self._attr_extra_state_attributes = {}
class KNXSystemSensor(SensorEntity):
"""Representation of a KNX system sensor."""

View File

@@ -65,3 +65,6 @@ CONF_GA_WHITE_BRIGHTNESS: Final = "ga_white_brightness"
CONF_GA_WHITE_SWITCH: Final = "ga_white_switch"
CONF_GA_HUE: Final = "ga_hue"
CONF_GA_SATURATION: Final = "ga_saturation"
# Sensor
CONF_ALWAYS_CALLBACK: Final = "always_callback"

View File

@@ -5,11 +5,21 @@ from enum import StrEnum, unique
import voluptuous as vol
from homeassistant.components.climate import HVACMode
from homeassistant.components.sensor import (
CONF_STATE_CLASS as CONF_SENSOR_STATE_CLASS,
DEVICE_CLASS_STATE_CLASSES,
DEVICE_CLASS_UNITS,
STATE_CLASS_UNITS,
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.const import (
CONF_DEVICE_CLASS,
CONF_ENTITY_CATEGORY,
CONF_ENTITY_ID,
CONF_NAME,
CONF_PLATFORM,
CONF_UNIT_OF_MEASUREMENT,
Platform,
)
from homeassistant.helpers import config_validation as cv, selector
@@ -30,12 +40,15 @@ from ..const import (
CoverConf,
FanZeroMode,
)
from ..dpt import get_supported_dpts
from .const import (
CONF_ALWAYS_CALLBACK,
CONF_COLOR,
CONF_COLOR_TEMP_MAX,
CONF_COLOR_TEMP_MIN,
CONF_DATA,
CONF_DEVICE_INFO,
CONF_DPT,
CONF_ENTITY,
CONF_GA_ACTIVE,
CONF_GA_ANGLE,
@@ -507,6 +520,114 @@ CLIMATE_KNX_SCHEMA = vol.Schema(
},
)
def _validate_sensor_attributes(config: dict) -> dict:
"""Validate that state_class is compatible with device_class and unit_of_measurement."""
dpt = config[CONF_GA_SENSOR][CONF_DPT]
dpt_metadata = get_supported_dpts()[dpt]
state_class = config.get(
CONF_SENSOR_STATE_CLASS,
dpt_metadata["sensor_state_class"],
)
device_class = config.get(
CONF_DEVICE_CLASS,
dpt_metadata["sensor_device_class"],
)
unit_of_measurement = config.get(
CONF_UNIT_OF_MEASUREMENT,
dpt_metadata["unit"],
)
if (
state_class
and device_class
and (state_classes := DEVICE_CLASS_STATE_CLASSES.get(device_class)) is not None
and state_class not in state_classes
):
raise vol.Invalid(
f"State class '{state_class}' is not valid for device class '{device_class}'. "
f"Valid options are: {', '.join(sorted(map(str, state_classes), key=str.casefold))}",
path=[CONF_SENSOR_STATE_CLASS],
)
if (
device_class
and (d_c_units := DEVICE_CLASS_UNITS.get(device_class)) is not None
and unit_of_measurement not in d_c_units
):
raise vol.Invalid(
f"Unit of measurement '{unit_of_measurement}' is not valid for device class '{device_class}'. "
f"Valid options are: {', '.join(sorted(map(str, d_c_units), key=str.casefold))}",
path=(
[CONF_DEVICE_CLASS]
if CONF_DEVICE_CLASS in config
else [CONF_UNIT_OF_MEASUREMENT]
),
)
if (
state_class
and (s_c_units := STATE_CLASS_UNITS.get(state_class)) is not None
and unit_of_measurement not in s_c_units
):
raise vol.Invalid(
f"Unit of measurement '{unit_of_measurement}' is not valid for state class '{state_class}'. "
f"Valid options are: {', '.join(sorted(map(str, s_c_units), key=str.casefold))}",
path=(
[CONF_SENSOR_STATE_CLASS]
if CONF_SENSOR_STATE_CLASS in config
else [CONF_UNIT_OF_MEASUREMENT]
),
)
return config
SENSOR_KNX_SCHEMA = AllSerializeFirst(
vol.Schema(
{
vol.Required(CONF_GA_SENSOR): GASelector(
write=False, state_required=True, dpt=["numeric", "string"]
),
"section_advanced_options": KNXSectionFlat(collapsible=True),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): selector.SelectSelector(
selector.SelectSelectorConfig(
options=sorted(
{
str(unit)
for units in DEVICE_CLASS_UNITS.values()
for unit in units
if unit is not None
}
),
mode=selector.SelectSelectorMode.DROPDOWN,
translation_key="component.knx.selector.sensor_unit_of_measurement",
custom_value=True,
),
),
vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
selector.SelectSelectorConfig(
options=[
cls.value
for cls in SensorDeviceClass
if cls != SensorDeviceClass.ENUM
],
translation_key="component.knx.selector.sensor_device_class",
sort=True,
)
),
vol.Optional(CONF_SENSOR_STATE_CLASS): selector.SelectSelector(
selector.SelectSelectorConfig(
options=list(SensorStateClass),
translation_key="component.knx.selector.sensor_state_class",
mode=selector.SelectSelectorMode.DROPDOWN,
)
),
vol.Optional(CONF_ALWAYS_CALLBACK): selector.BooleanSelector(),
vol.Required(CONF_SYNC_STATE, default=True): SyncStateSelector(
allow_false=True
),
},
),
_validate_sensor_attributes,
)
KNX_SCHEMA_FOR_PLATFORM = {
Platform.BINARY_SENSOR: BINARY_SENSOR_KNX_SCHEMA,
Platform.CLIMATE: CLIMATE_KNX_SCHEMA,
@@ -514,6 +635,7 @@ KNX_SCHEMA_FOR_PLATFORM = {
Platform.DATE: DATE_KNX_SCHEMA,
Platform.DATETIME: DATETIME_KNX_SCHEMA,
Platform.LIGHT: LIGHT_KNX_SCHEMA,
Platform.SENSOR: SENSOR_KNX_SCHEMA,
Platform.SWITCH: SWITCH_KNX_SCHEMA,
Platform.TIME: TIME_KNX_SCHEMA,
}

View File

@@ -6,6 +6,7 @@ from typing import Any
import voluptuous as vol
from ..dpt import HaDptClass, get_supported_dpts
from ..validation import ga_validator, maybe_ga_validator, sync_state_validator
from .const import CONF_DPT, CONF_GA_PASSIVE, CONF_GA_STATE, CONF_GA_WRITE
from .util import dpt_string_to_dict
@@ -162,7 +163,7 @@ class GASelector(KNXSelectorBase):
passive: bool = True,
write_required: bool = False,
state_required: bool = False,
dpt: type[Enum] | None = None,
dpt: type[Enum] | list[HaDptClass] | None = None,
valid_dpt: str | Iterable[str] | None = None,
) -> None:
"""Initialize the group address selector."""
@@ -186,14 +187,17 @@ class GASelector(KNXSelectorBase):
"passive": self.passive,
}
if self.dpt is not None:
options["dptSelect"] = [
{
"value": item.value,
"translation_key": item.value.replace(".", "_"),
"dpt": dpt_string_to_dict(item.value), # used for filtering GAs
}
for item in self.dpt
]
if isinstance(self.dpt, list):
options["dptClasses"] = self.dpt
else:
options["dptSelect"] = [
{
"value": item.value,
"translation_key": item.value.replace(".", "_"),
"dpt": dpt_string_to_dict(item.value), # used for filtering GAs
}
for item in self.dpt
]
if self.valid_dpt is not None:
options["validDPTs"] = [dpt_string_to_dict(dpt) for dpt in self.valid_dpt]
@@ -254,7 +258,12 @@ class GASelector(KNXSelectorBase):
def _add_dpt(self, schema: dict[vol.Marker, Any]) -> None:
"""Add DPT validator to the schema."""
if self.dpt is not None:
schema[vol.Required(CONF_DPT)] = vol.In({item.value for item in self.dpt})
if isinstance(self.dpt, list):
schema[vol.Required(CONF_DPT)] = vol.In(get_supported_dpts())
else:
schema[vol.Required(CONF_DPT)] = vol.In(
{item.value for item in self.dpt}
)
else:
schema[vol.Remove(CONF_DPT)] = object

View File

@@ -558,6 +558,35 @@
}
}
},
"sensor": {
"description": "Read-only entity for numeric or string datapoints. Temperature, percent etc.",
"knx": {
"always_callback": {
"description": "Write each update to the state machine, even if the data is the same.",
"label": "Force update"
},
"device_class": {
"description": "Override the DPTs default device class.",
"label": "Device class"
},
"ga_sensor": {
"description": "Group address representing state.",
"label": "State"
},
"section_advanced_options": {
"description": "Override default DPT-based sensor attributes.",
"title": "Overrides"
},
"state_class": {
"description": "Override the DPTs default state class.",
"label": "[%key:component::sensor::entity_component::_::state_attributes::state_class::name%]"
},
"unit_of_measurement": {
"description": "Override the DPTs default unit of measurement.",
"label": "Unit of measurement"
}
}
},
"switch": {
"description": "The KNX switch platform is used as an interface to switching actuators.",
"knx": {
@@ -639,10 +668,6 @@
"name": "Telegrams",
"unit_of_measurement": "telegrams"
},
"telegrams_data_secure_undecodable": {
"name": "Undecodable Data Secure telegrams",
"unit_of_measurement": "[%key:component::knx::entity::sensor::telegrams_incoming_error::unit_of_measurement%]"
},
"telegrams_incoming": {
"name": "Incoming telegrams",
"unit_of_measurement": "[%key:component::knx::entity::sensor::telegram_count::unit_of_measurement%]"
@@ -692,6 +717,85 @@
}
}
},
"selector": {
"dpt": {
"fields": {
"label": "Select a datapoint type",
"no_selection": "No DPT selected"
}
},
"sensor_device_class": {
"options": {
"absolute_humidity": "[%key:component::sensor::entity_component::absolute_humidity::name%]",
"apparent_power": "[%key:component::sensor::entity_component::apparent_power::name%]",
"aqi": "[%key:component::sensor::entity_component::aqi::name%]",
"area": "[%key:component::sensor::entity_component::area::name%]",
"atmospheric_pressure": "[%key:component::sensor::entity_component::atmospheric_pressure::name%]",
"battery": "[%key:component::sensor::entity_component::battery::name%]",
"blood_glucose_concentration": "[%key:component::sensor::entity_component::blood_glucose_concentration::name%]",
"carbon_dioxide": "[%key:component::sensor::entity_component::carbon_dioxide::name%]",
"carbon_monoxide": "[%key:component::sensor::entity_component::carbon_monoxide::name%]",
"conductivity": "[%key:component::sensor::entity_component::conductivity::name%]",
"current": "[%key:component::sensor::entity_component::current::name%]",
"data_rate": "[%key:component::sensor::entity_component::data_rate::name%]",
"data_size": "[%key:component::sensor::entity_component::data_size::name%]",
"date": "[%key:component::sensor::entity_component::date::name%]",
"distance": "[%key:component::sensor::entity_component::distance::name%]",
"duration": "[%key:component::sensor::entity_component::duration::name%]",
"energy": "[%key:component::sensor::entity_component::energy::name%]",
"energy_distance": "[%key:component::sensor::entity_component::energy_distance::name%]",
"energy_storage": "[%key:component::sensor::entity_component::energy_storage::name%]",
"frequency": "[%key:component::sensor::entity_component::frequency::name%]",
"gas": "[%key:component::sensor::entity_component::gas::name%]",
"humidity": "[%key:component::sensor::entity_component::humidity::name%]",
"illuminance": "[%key:component::sensor::entity_component::illuminance::name%]",
"irradiance": "[%key:component::sensor::entity_component::irradiance::name%]",
"moisture": "[%key:component::sensor::entity_component::moisture::name%]",
"monetary": "[%key:component::sensor::entity_component::monetary::name%]",
"nitrogen_dioxide": "[%key:component::sensor::entity_component::nitrogen_dioxide::name%]",
"nitrogen_monoxide": "[%key:component::sensor::entity_component::nitrogen_monoxide::name%]",
"nitrous_oxide": "[%key:component::sensor::entity_component::nitrous_oxide::name%]",
"ozone": "[%key:component::sensor::entity_component::ozone::name%]",
"ph": "[%key:component::sensor::entity_component::ph::name%]",
"pm1": "[%key:component::sensor::entity_component::pm1::name%]",
"pm10": "[%key:component::sensor::entity_component::pm10::name%]",
"pm25": "[%key:component::sensor::entity_component::pm25::name%]",
"pm4": "[%key:component::sensor::entity_component::pm4::name%]",
"power": "[%key:component::sensor::entity_component::power::name%]",
"power_factor": "[%key:component::sensor::entity_component::power_factor::name%]",
"precipitation": "[%key:component::sensor::entity_component::precipitation::name%]",
"precipitation_intensity": "[%key:component::sensor::entity_component::precipitation_intensity::name%]",
"pressure": "[%key:component::sensor::entity_component::pressure::name%]",
"reactive_energy": "[%key:component::sensor::entity_component::reactive_energy::name%]",
"reactive_power": "[%key:component::sensor::entity_component::reactive_power::name%]",
"signal_strength": "[%key:component::sensor::entity_component::signal_strength::name%]",
"sound_pressure": "[%key:component::sensor::entity_component::sound_pressure::name%]",
"speed": "[%key:component::sensor::entity_component::speed::name%]",
"sulphur_dioxide": "[%key:component::sensor::entity_component::sulphur_dioxide::name%]",
"temperature": "[%key:component::sensor::entity_component::temperature::name%]",
"temperature_delta": "[%key:component::sensor::entity_component::temperature_delta::name%]",
"timestamp": "[%key:component::sensor::entity_component::timestamp::name%]",
"volatile_organic_compounds": "[%key:component::sensor::entity_component::volatile_organic_compounds::name%]",
"volatile_organic_compounds_parts": "[%key:component::sensor::entity_component::volatile_organic_compounds_parts::name%]",
"voltage": "[%key:component::sensor::entity_component::voltage::name%]",
"volume": "[%key:component::sensor::entity_component::volume::name%]",
"volume_flow_rate": "[%key:component::sensor::entity_component::volume_flow_rate::name%]",
"volume_storage": "[%key:component::sensor::entity_component::volume_storage::name%]",
"water": "[%key:component::sensor::entity_component::water::name%]",
"weight": "[%key:component::sensor::entity_component::weight::name%]",
"wind_direction": "[%key:component::sensor::entity_component::wind_direction::name%]",
"wind_speed": "[%key:component::sensor::entity_component::wind_speed::name%]"
}
},
"sensor_state_class": {
"options": {
"measurement": "[%key:component::sensor::entity_component::_::state_attributes::state_class::state::measurement%]",
"measurement_angle": "[%key:component::sensor::entity_component::_::state_attributes::state_class::state::measurement_angle%]",
"total": "[%key:component::sensor::entity_component::_::state_attributes::state_class::state::total%]",
"total_increasing": "[%key:component::sensor::entity_component::_::state_attributes::state_class::state::total_increasing%]"
}
}
},
"services": {
"event_register": {
"description": "Adds or removes group addresses to knx_event filter for triggering `knx_event`s. Only addresses added with this action can be removed.",

View File

@@ -22,6 +22,7 @@ from homeassistant.helpers.typing import UNDEFINED
from homeassistant.util.ulid import ulid_now
from .const import DOMAIN, KNX_MODULE_KEY, SUPPORTED_PLATFORMS_UI
from .dpt import get_supported_dpts
from .storage.config_store import ConfigStoreException
from .storage.const import CONF_DATA
from .storage.entity_store_schema import (
@@ -186,6 +187,7 @@ def ws_get_base_data(
msg["id"],
{
"connection_info": connection_info,
"dpt_metadata": get_supported_dpts(),
"project_info": _project_info,
"supported_platforms": sorted(SUPPORTED_PLATFORMS_UI),
},

View File

@@ -37,5 +37,5 @@
"iot_class": "cloud_push",
"loggers": ["pylamarzocco"],
"quality_scale": "platinum",
"requirements": ["pylamarzocco==2.2.3"]
"requirements": ["pylamarzocco==2.2.2"]
}

View File

@@ -9,5 +9,5 @@
"iot_class": "local_polling",
"loggers": ["pypck"],
"quality_scale": "silver",
"requirements": ["pypck==0.9.7", "lcn-frontend==0.2.7"]
"requirements": ["pypck==0.9.5", "lcn-frontend==0.2.7"]
}

View File

@@ -98,11 +98,7 @@ class LutronCasetaSmartAwaySwitch(LutronCasetaEntity, SwitchEntity):
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await super().async_added_to_hass()
self._smartbridge.add_smart_away_subscriber(self._handle_smart_away_update)
def _handle_smart_away_update(self, smart_away_state: str | None = None) -> None:
"""Handle updated smart away state from the bridge."""
self.async_write_ha_state()
self._smartbridge.add_smart_away_subscriber(self._handle_bridge_update)
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn Smart Away on."""

View File

@@ -183,35 +183,10 @@ PUMP_CONTROL_MODE_MAP = {
clusters.PumpConfigurationAndControl.Enums.ControlModeEnum.kUnknownEnumValue: None,
}
MATTER_2000_TO_UNIX_EPOCH_OFFSET = (
946684800 # Seconds from Matter 2000 epoch to Unix epoch
)
HUMIDITY_SCALING_FACTOR = 100
TEMPERATURE_SCALING_FACTOR = 100
def matter_epoch_seconds_to_utc(x: int | None) -> datetime | None:
"""Convert Matter epoch seconds (since 2000-01-01) to UTC datetime.
Returns None for non-positive or None values (represents unknown/absent).
"""
if x is None or x <= 0:
return None
return dt_util.utc_from_timestamp(x + MATTER_2000_TO_UNIX_EPOCH_OFFSET)
def matter_epoch_microseconds_to_utc(x: int | None) -> datetime | None:
"""Convert Matter epoch microseconds (since 2000-01-01) to UTC datetime.
The value is in microseconds; convert to seconds before applying offset.
Returns None for non-positive or None values.
"""
if x is None or x <= 0:
return None
seconds = x // 1_000_000
return dt_util.utc_from_timestamp(seconds + MATTER_2000_TO_UNIX_EPOCH_OFFSET)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@@ -1493,8 +1468,7 @@ DISCOVERY_SCHEMAS = [
translation_key="auto_close_time",
device_class=SensorDeviceClass.TIMESTAMP,
state_class=None,
# AutoCloseTime is defined as epoch-us in the spec
device_to_ha=matter_epoch_microseconds_to_utc,
device_to_ha=(lambda x: dt_util.utc_from_timestamp(x) if x > 0 else None),
),
entity_class=MatterSensor,
featuremap_contains=clusters.ValveConfigurationAndControl.Bitmaps.Feature.kTimeSync,
@@ -1509,8 +1483,7 @@ DISCOVERY_SCHEMAS = [
translation_key="estimated_end_time",
device_class=SensorDeviceClass.TIMESTAMP,
state_class=None,
# EstimatedEndTime is defined as epoch-s (Matter 2000 epoch) in the spec
device_to_ha=matter_epoch_seconds_to_utc,
device_to_ha=(lambda x: dt_util.utc_from_timestamp(x) if x > 0 else None),
),
entity_class=MatterSensor,
required_attributes=(clusters.ServiceArea.Attributes.EstimatedEndTime,),

Some files were not shown because too many files have changed in this diff Show More