Compare commits

..

38 Commits

Author SHA1 Message Date
Bram Kragten
f877614e7f Bump version to 2025.11.0b3 2025-11-03 20:46:58 +01:00
Mike Degatano
170e1e87c7 Disable deprecated addon repair (#155739) 2025-11-03 20:46:03 +01:00
Michael Hansen
e1feba5c86 Use character code in language matching (voice) (#155738) 2025-11-03 20:46:02 +01:00
Bram Kragten
9bf52b7966 Update frontend to 20251103.0 (#155734) 2025-11-03 20:46:02 +01:00
Simone Chemelli
3bc61a3564 Bump aioamazondevices to 6.5.6 (#155723) 2025-11-03 20:46:01 +01:00
Bram Kragten
d2ba94e1bf Bump version to 2025.11.0b2 2025-11-03 08:04:32 +01:00
Joost Lekkerkerker
9a4ed82399 Bump python-open-router to 0.3.2 (#155700) 2025-11-03 08:04:12 +01:00
cdnninja
b5136d01aa fix vesync mist level value (#155697) 2025-11-03 08:04:11 +01:00
starkillerOG
d3e05090ea Bump reolink_aio to 0.16.3 (#155692) 2025-11-03 08:04:10 +01:00
Michael
7e75ca7af9 Revert "Remove neato integration (#154902)" (#155685) 2025-11-03 08:04:10 +01:00
Matthias Alphart
6616b5775f Fix KNX climate loading min/max temp from UI config (#155682) 2025-11-03 08:04:09 +01:00
Robert Resch
69b82d4c59 Bump deebot-client to 16.3.0 (#155681) 2025-11-03 08:04:08 +01:00
Bram Kragten
6b9709677a Fix device tracker name & icon for Volvo integration (#155667) 2025-11-03 08:03:00 +01:00
Robert Resch
a4e9c82c84 Bump deebot-client to 16.2.0 (#155642) 2025-11-03 07:57:45 +01:00
cdnninja
de86bedb80 vesync don't assume fan speed target (#155617) 2025-11-03 07:57:44 +01:00
Matthias Alphart
9111c6df90 Update knx-frontend to 2025.10.31.195356 (#155569) 2025-11-03 07:57:43 +01:00
Jordan Harvey
751f6bddb1 Update pynintendoparental to version 1.1.3 (#155568) 2025-11-03 07:57:42 +01:00
Josef Zweck
c9a61de0a1 Bump onedrive-personal-sdk to 0.0.15 (#155540) 2025-11-03 07:57:41 +01:00
Sid
01fb46d903 Bump eheimdigital to 1.4.0 (#155539) 2025-11-03 07:57:41 +01:00
cdnninja
d26f61c9fe Bump pyvesync to 3.1.4 (#155533) 2025-11-03 07:57:39 +01:00
Robert Resch
a47a144312 Bump uv to 0.9.6 (#155521) 2025-11-03 07:57:38 +01:00
Erwin Douna
69cf4f99d1 Portainer refactor CONF_VERIFY_SSL (#155520) 2025-11-03 07:57:37 +01:00
Shay Levy
e6c757c187 Fix Shelly irrigation zone ID retrieval with Sleepy devices (#155514) 2025-11-03 07:57:36 +01:00
hanwg
a36b0e2f3f Fix event entity state update for Telegram bot (#155510) 2025-11-03 07:57:35 +01:00
Jakob Schlyter
1a7c6cd96c Update regions and voices used by Amazon Polly (#155501) 2025-11-03 07:57:34 +01:00
tronikos
ba3e538402 Bump opower to 0.15.9 (#155473) 2025-11-03 07:57:33 +01:00
Mike Degatano
b2cd08aa65 Addon progress reporting follow-up from feedback (#155464) 2025-11-03 07:57:33 +01:00
karwosts
06dcd25a16 Hassfest check for invalid localization placeholders (#155216) 2025-11-03 07:57:32 +01:00
Bram Kragten
fd36782bae Bump version to 2025.11.0b1 2025-10-30 20:12:15 +01:00
Bram Kragten
ed4573db57 Update frontend to 20251029.1 (#155513) 2025-10-30 20:11:55 +01:00
Erwin Douna
78373a6483 Firefly fix config flow (#155503) 2025-10-30 20:11:54 +01:00
Sab44
8455c35bec Bump librehardwaremonitor-api to 1.5.0 (#155492) 2025-10-30 20:11:53 +01:00
Kinachi249
00887a2f3f Bump PyCync to 0.4.3 (#155477) 2025-10-30 20:11:52 +01:00
Erwin Douna
f1ca7543fa Bump pyportainer 1.0.12 (#155468) 2025-10-30 20:11:51 +01:00
Abílio Costa
bb72b24ba9 Mock async_setup_entry in BMW Connected Drive config flow test (#155446) 2025-10-30 20:11:50 +01:00
Andrea Turri
322a27d992 Miele RestoreSensor: restore native value rather than stringified state (#152750)
Co-authored-by: Erik Montnemery <erik@montnemery.com>
Co-authored-by: Marc Mueller <30130371+cdce8p@users.noreply.github.com>
2025-10-30 20:11:49 +01:00
hanwg
a3b516110b Deprecate legacy Telegram notify service (#150720)
Co-authored-by: G Johansson <goran.johansson@shiftit.se>
Co-authored-by: Abílio Costa <abmantis@users.noreply.github.com>
Co-authored-by: abmantis <amfcalt@gmail.com>
2025-10-30 20:11:48 +01:00
Bram Kragten
95ac5c0183 Bump version to 2025.11.0b0 2025-10-29 18:53:20 +01:00
235 changed files with 8646 additions and 5023 deletions

View File

@@ -162,6 +162,18 @@ jobs:
sed -i "s|home-assistant-intents==.*||" requirements_all.txt
fi
- name: Adjustments for armhf
if: matrix.arch == 'armhf'
run: |
# Pandas has issues building on armhf, it is expected they
# will drop the platform in the near future (they consider it
# "flimsy" on 386). The following packages depend on pandas,
# so we comment them out.
sed -i "s|env-canada|# env-canada|g" requirements_all.txt
sed -i "s|noaa-coops|# noaa-coops|g" requirements_all.txt
sed -i "s|pyezviz|# pyezviz|g" requirements_all.txt
sed -i "s|pykrakenapi|# pykrakenapi|g" requirements_all.txt
- name: Download translations
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
with:
@@ -214,11 +226,19 @@ jobs:
- odroid-c4
- odroid-m1
- odroid-n2
- odroid-xu
- qemuarm
- qemuarm-64
- qemux86
- qemux86-64
- raspberrypi
- raspberrypi2
- raspberrypi3
- raspberrypi3-64
- raspberrypi4
- raspberrypi4-64
- raspberrypi5-64
- tinker
- yellow
- green
steps:
@@ -277,7 +297,6 @@ jobs:
key-description: "Home Assistant Core"
version: ${{ needs.init.outputs.version }}
channel: ${{ needs.init.outputs.channel }}
exclude-list: '["odroid-xu","qemuarm","qemux86","raspberrypi","raspberrypi2","raspberrypi3","raspberrypi4","tinker"]'
- name: Update version file (stable -> beta)
if: needs.init.outputs.channel == 'stable'
@@ -287,7 +306,6 @@ jobs:
key-description: "Home Assistant Core"
version: ${{ needs.init.outputs.version }}
channel: beta
exclude-list: '["odroid-xu","qemuarm","qemux86","raspberrypi","raspberrypi2","raspberrypi3","raspberrypi4","tinker"]'
publish_container:
name: Publish meta container for ${{ matrix.registry }}
@@ -339,12 +357,27 @@ jobs:
docker manifest create "${registry}/home-assistant:${tag_l}" \
"${registry}/amd64-homeassistant:${tag_r}" \
"${registry}/i386-homeassistant:${tag_r}" \
"${registry}/armhf-homeassistant:${tag_r}" \
"${registry}/armv7-homeassistant:${tag_r}" \
"${registry}/aarch64-homeassistant:${tag_r}"
docker manifest annotate "${registry}/home-assistant:${tag_l}" \
"${registry}/amd64-homeassistant:${tag_r}" \
--os linux --arch amd64
docker manifest annotate "${registry}/home-assistant:${tag_l}" \
"${registry}/i386-homeassistant:${tag_r}" \
--os linux --arch 386
docker manifest annotate "${registry}/home-assistant:${tag_l}" \
"${registry}/armhf-homeassistant:${tag_r}" \
--os linux --arch arm --variant=v6
docker manifest annotate "${registry}/home-assistant:${tag_l}" \
"${registry}/armv7-homeassistant:${tag_r}" \
--os linux --arch arm --variant=v7
docker manifest annotate "${registry}/home-assistant:${tag_l}" \
"${registry}/aarch64-homeassistant:${tag_r}" \
--os linux --arch arm64 --variant=v8
@@ -372,14 +405,23 @@ jobs:
# Pull images from github container registry and verify signature
docker pull "ghcr.io/home-assistant/amd64-homeassistant:${{ needs.init.outputs.version }}"
docker pull "ghcr.io/home-assistant/i386-homeassistant:${{ needs.init.outputs.version }}"
docker pull "ghcr.io/home-assistant/armhf-homeassistant:${{ needs.init.outputs.version }}"
docker pull "ghcr.io/home-assistant/armv7-homeassistant:${{ needs.init.outputs.version }}"
docker pull "ghcr.io/home-assistant/aarch64-homeassistant:${{ needs.init.outputs.version }}"
validate_image "ghcr.io/home-assistant/amd64-homeassistant:${{ needs.init.outputs.version }}"
validate_image "ghcr.io/home-assistant/i386-homeassistant:${{ needs.init.outputs.version }}"
validate_image "ghcr.io/home-assistant/armhf-homeassistant:${{ needs.init.outputs.version }}"
validate_image "ghcr.io/home-assistant/armv7-homeassistant:${{ needs.init.outputs.version }}"
validate_image "ghcr.io/home-assistant/aarch64-homeassistant:${{ needs.init.outputs.version }}"
if [[ "${{ matrix.registry }}" == "docker.io/homeassistant" ]]; then
# Upload images to dockerhub
push_dockerhub "amd64-homeassistant" "${{ needs.init.outputs.version }}"
push_dockerhub "i386-homeassistant" "${{ needs.init.outputs.version }}"
push_dockerhub "armhf-homeassistant" "${{ needs.init.outputs.version }}"
push_dockerhub "armv7-homeassistant" "${{ needs.init.outputs.version }}"
push_dockerhub "aarch64-homeassistant" "${{ needs.init.outputs.version }}"
fi

View File

@@ -40,7 +40,7 @@ env:
CACHE_VERSION: 1
UV_CACHE_VERSION: 1
MYPY_CACHE_VERSION: 1
HA_SHORT_VERSION: "2025.12"
HA_SHORT_VERSION: "2025.11"
DEFAULT_PYTHON: "3.13"
ALL_PYTHON_VERSIONS: "['3.13', '3.14']"
# 10.3 is the oldest supported version
@@ -502,6 +502,7 @@ jobs:
libavfilter-dev \
libavformat-dev \
libavutil-dev \
libgammu-dev \
libswresample-dev \
libswscale-dev \
libudev-dev
@@ -800,7 +801,8 @@ jobs:
-o Dir::State::Lists=${{ env.APT_LIST_CACHE_DIR }} \
bluez \
ffmpeg \
libturbojpeg
libturbojpeg \
libgammu-dev
- *checkout
- *setup-python-default
- *cache-restore-python-default
@@ -851,6 +853,7 @@ jobs:
bluez \
ffmpeg \
libturbojpeg \
libgammu-dev \
libxml2-utils
- *checkout
- *setup-python-matrix
@@ -1230,6 +1233,7 @@ jobs:
bluez \
ffmpeg \
libturbojpeg \
libgammu-dev \
libxml2-utils
- *checkout
- *setup-python-matrix

View File

@@ -228,7 +228,7 @@ jobs:
arch: ${{ matrix.arch }}
wheels-key: ${{ secrets.WHEELS_KEY }}
env-file: true
apk: "bluez-dev;libffi-dev;openssl-dev;glib-dev;eudev-dev;libxml2-dev;libxslt-dev;libpng-dev;libjpeg-turbo-dev;tiff-dev;gmp-dev;mpfr-dev;mpc1-dev;ffmpeg-dev;yaml-dev;openblas-dev;fftw-dev;lapack-dev;gfortran;blas-dev;eigen-dev;freetype-dev;glew-dev;harfbuzz-dev;hdf5-dev;libdc1394-dev;libtbb-dev;mesa-dev;openexr-dev;openjpeg-dev;uchardet-dev;nasm;zlib-ng-dev"
apk: "bluez-dev;libffi-dev;openssl-dev;glib-dev;eudev-dev;libxml2-dev;libxslt-dev;libpng-dev;libjpeg-turbo-dev;tiff-dev;cups-dev;gmp-dev;mpfr-dev;mpc1-dev;ffmpeg-dev;gammu-dev;yaml-dev;openblas-dev;fftw-dev;lapack-dev;gfortran;blas-dev;eigen-dev;freetype-dev;glew-dev;harfbuzz-dev;hdf5-dev;libdc1394-dev;libtbb-dev;mesa-dev;openexr-dev;openjpeg-dev;uchardet-dev;nasm;zlib-ng-dev"
skip-binary: aiohttp;charset-normalizer;grpcio;multidict;SQLAlchemy;propcache;protobuf;pymicro-vad;yarl
constraints: "homeassistant/package_constraints.txt"
requirements-diff: "requirements_diff.txt"

View File

@@ -107,7 +107,6 @@ homeassistant.components.automation.*
homeassistant.components.awair.*
homeassistant.components.axis.*
homeassistant.components.azure_storage.*
homeassistant.components.backblaze_b2.*
homeassistant.components.backup.*
homeassistant.components.baf.*
homeassistant.components.bang_olufsen.*
@@ -362,6 +361,7 @@ homeassistant.components.myuplink.*
homeassistant.components.nam.*
homeassistant.components.nanoleaf.*
homeassistant.components.nasweb.*
homeassistant.components.neato.*
homeassistant.components.nest.*
homeassistant.components.netatmo.*
homeassistant.components.network.*
@@ -395,6 +395,7 @@ homeassistant.components.otbr.*
homeassistant.components.overkiz.*
homeassistant.components.overseerr.*
homeassistant.components.p1_monitor.*
homeassistant.components.pandora.*
homeassistant.components.panel_custom.*
homeassistant.components.paperless_ngx.*
homeassistant.components.peblar.*

12
CODEOWNERS generated
View File

@@ -196,8 +196,6 @@ build.json @home-assistant/supervisor
/homeassistant/components/azure_service_bus/ @hfurubotten
/homeassistant/components/azure_storage/ @zweckj
/tests/components/azure_storage/ @zweckj
/homeassistant/components/backblaze_b2/ @hugo-vrijswijk @ElCruncharino
/tests/components/backblaze_b2/ @hugo-vrijswijk @ElCruncharino
/homeassistant/components/backup/ @home-assistant/core
/tests/components/backup/ @home-assistant/core
/homeassistant/components/baf/ @bdraco @jfroy
@@ -318,6 +316,8 @@ build.json @home-assistant/supervisor
/tests/components/cpuspeed/ @fabaff
/homeassistant/components/crownstone/ @Crownstone @RicArch97
/tests/components/crownstone/ @Crownstone @RicArch97
/homeassistant/components/cups/ @fabaff
/tests/components/cups/ @fabaff
/homeassistant/components/cync/ @Kinachi249
/tests/components/cync/ @Kinachi249
/homeassistant/components/daikin/ @fredrike
@@ -510,6 +510,8 @@ build.json @home-assistant/supervisor
/tests/components/fjaraskupan/ @elupus
/homeassistant/components/flexit_bacnet/ @lellky @piotrbulinski
/tests/components/flexit_bacnet/ @lellky @piotrbulinski
/homeassistant/components/flick_electric/ @ZephireNZ
/tests/components/flick_electric/ @ZephireNZ
/homeassistant/components/flipr/ @cnico
/tests/components/flipr/ @cnico
/homeassistant/components/flo/ @dmulcahey
@@ -1477,6 +1479,8 @@ build.json @home-assistant/supervisor
/tests/components/smhi/ @gjohansson-ST
/homeassistant/components/smlight/ @tl-sl
/tests/components/smlight/ @tl-sl
/homeassistant/components/sms/ @ocalvo
/tests/components/sms/ @ocalvo
/homeassistant/components/snapcast/ @luar123
/tests/components/snapcast/ @luar123
/homeassistant/components/snmp/ @nmaggioni
@@ -1717,8 +1721,8 @@ build.json @home-assistant/supervisor
/tests/components/vallox/ @andre-richter @slovdahl @viiru- @yozik04
/homeassistant/components/valve/ @home-assistant/core
/tests/components/valve/ @home-assistant/core
/homeassistant/components/vegehub/ @thulrus
/tests/components/vegehub/ @thulrus
/homeassistant/components/vegehub/ @ghowevege
/tests/components/vegehub/ @ghowevege
/homeassistant/components/velbus/ @Cereal2nd @brefra
/tests/components/velbus/ @Cereal2nd @brefra
/homeassistant/components/velux/ @Julius2342 @DeerMaximum @pawlizio @wollew

View File

@@ -13,6 +13,7 @@ RUN \
libavcodec-dev \
libavdevice-dev \
libavutil-dev \
libgammu-dev \
libswscale-dev \
libswresample-dev \
libavfilter-dev \

View File

@@ -1,7 +1,10 @@
image: ghcr.io/home-assistant/{arch}-homeassistant
build_from:
aarch64: ghcr.io/home-assistant/aarch64-homeassistant-base:2025.10.1
armhf: ghcr.io/home-assistant/armhf-homeassistant-base:2025.10.1
armv7: ghcr.io/home-assistant/armv7-homeassistant-base:2025.10.1
amd64: ghcr.io/home-assistant/amd64-homeassistant-base:2025.10.1
i386: ghcr.io/home-assistant/i386-homeassistant-base:2025.10.1
cosign:
base_identity: https://github.com/home-assistant/docker/.*
identity: https://github.com/home-assistant/core/.*

View File

@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"loggers": ["aioamazondevices"],
"quality_scale": "platinum",
"requirements": ["aioamazondevices==6.5.5"]
"requirements": ["aioamazondevices==6.5.6"]
}

View File

@@ -1,116 +0,0 @@
"""The Backblaze B2 integration."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from b2sdk.v2 import B2Api, Bucket, InMemoryAccountInfo, exception
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.event import async_track_time_interval
from .const import (
BACKBLAZE_REALM,
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from .repairs import (
async_check_for_repair_issues,
create_bucket_access_restricted_issue,
create_bucket_not_found_issue,
)
_LOGGER = logging.getLogger(__name__)
type BackblazeConfigEntry = ConfigEntry[Bucket]
async def async_setup_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Set up Backblaze B2 from a config entry."""
info = InMemoryAccountInfo()
b2_api = B2Api(info)
def _authorize_and_get_bucket_sync() -> Bucket:
"""Synchronously authorize the Backblaze B2 account and retrieve the bucket.
This function runs in the event loop's executor as b2sdk operations are blocking.
"""
b2_api.authorize_account(
BACKBLAZE_REALM,
entry.data[CONF_KEY_ID],
entry.data[CONF_APPLICATION_KEY],
)
return b2_api.get_bucket_by_name(entry.data[CONF_BUCKET])
try:
bucket = await hass.async_add_executor_job(_authorize_and_get_bucket_sync)
except exception.Unauthorized as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_credentials",
) from err
except exception.RestrictedBucket as err:
create_bucket_access_restricted_issue(hass, entry, err.bucket_name)
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="restricted_bucket",
translation_placeholders={
"restricted_bucket_name": err.bucket_name,
},
) from err
except exception.NonExistentBucket as err:
create_bucket_not_found_issue(hass, entry, entry.data[CONF_BUCKET])
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="invalid_bucket_name",
) from err
except exception.ConnectionReset as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
except exception.MissingAccountData as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_auth",
) from err
entry.runtime_data = bucket
def _async_notify_backup_listeners() -> None:
"""Notify any registered backup agent listeners."""
_LOGGER.debug("Notifying backup listeners for entry %s", entry.entry_id)
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()
entry.async_on_unload(entry.async_on_state_change(_async_notify_backup_listeners))
async def _periodic_issue_check(_now: Any) -> None:
"""Periodically check for repair issues."""
await async_check_for_repair_issues(hass, entry)
entry.async_on_unload(
async_track_time_interval(hass, _periodic_issue_check, timedelta(minutes=30))
)
hass.async_create_task(async_check_for_repair_issues(hass, entry))
return True
async def async_unload_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Unload a Backblaze B2 config entry.
Any resources directly managed by this entry that need explicit shutdown
would be handled here. In this case, the `async_on_state_change` listener
handles the notification logic on unload.
"""
return True

View File

@@ -1,615 +0,0 @@
"""Backup platform for the Backblaze B2 integration."""
import asyncio
from collections.abc import AsyncIterator, Callable, Coroutine
import functools
import json
import logging
import mimetypes
from time import time
from typing import Any
from b2sdk.v2 import FileVersion
from b2sdk.v2.exception import B2Error
from homeassistant.components.backup import (
AgentBackup,
BackupAgent,
BackupAgentError,
BackupNotFound,
suggested_filename,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.util.async_iterator import AsyncIteratorReader
from . import BackblazeConfigEntry
from .const import (
CONF_PREFIX,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
METADATA_FILE_SUFFIX,
METADATA_VERSION,
)
_LOGGER = logging.getLogger(__name__)
# Cache TTL for backup list (in seconds)
CACHE_TTL = 300
def suggested_filenames(backup: AgentBackup) -> tuple[str, str]:
"""Return the suggested filenames for the backup and metadata files."""
base_name = suggested_filename(backup).rsplit(".", 1)[0]
return f"{base_name}.tar", f"{base_name}.metadata.json"
def _parse_metadata(raw_content: str) -> dict[str, Any]:
"""Parse metadata content from JSON."""
try:
data = json.loads(raw_content)
except json.JSONDecodeError as err:
raise ValueError(f"Invalid JSON format: {err}") from err
else:
if not isinstance(data, dict):
raise TypeError("JSON content is not a dictionary")
return data
def _find_backup_file_for_metadata(
metadata_filename: str, all_files: dict[str, FileVersion], prefix: str
) -> FileVersion | None:
"""Find corresponding backup file for metadata file."""
base_name = metadata_filename[len(prefix) :].removesuffix(METADATA_FILE_SUFFIX)
return next(
(
file
for name, file in all_files.items()
if name.startswith(prefix + base_name)
and name.endswith(".tar")
and name != metadata_filename
),
None,
)
def _create_backup_from_metadata(
metadata_content: dict[str, Any], backup_file: FileVersion
) -> AgentBackup:
"""Construct an AgentBackup from parsed metadata content and the associated backup file."""
metadata = metadata_content["backup_metadata"]
metadata["size"] = backup_file.size
return AgentBackup.from_dict(metadata)
def handle_b2_errors[T](
func: Callable[..., Coroutine[Any, Any, T]],
) -> Callable[..., Coroutine[Any, Any, T]]:
"""Handle B2Errors by converting them to BackupAgentError."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
"""Catch B2Error and raise BackupAgentError."""
try:
return await func(*args, **kwargs)
except B2Error as err:
error_msg = f"Failed during {func.__name__}"
raise BackupAgentError(error_msg) from err
return wrapper
async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents for all configured Backblaze B2 entries."""
entries: list[BackblazeConfigEntry] = hass.config_entries.async_loaded_entries(
DOMAIN
)
return [BackblazeBackupAgent(hass, entry) for entry in entries]
@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when backup agents are added or removed.
:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
hass.data.pop(DATA_BACKUP_AGENT_LISTENERS, None)
return remove_listener
class BackblazeBackupAgent(BackupAgent):
"""Backup agent for Backblaze B2 cloud storage."""
domain = DOMAIN
def __init__(self, hass: HomeAssistant, entry: BackblazeConfigEntry) -> None:
"""Initialize the Backblaze B2 agent."""
super().__init__()
self._hass = hass
self._bucket = entry.runtime_data
self._prefix = entry.data[CONF_PREFIX]
self.name = entry.title
self.unique_id = entry.entry_id
self._all_files_cache: dict[str, FileVersion] = {}
self._all_files_cache_expiration: float = 0.0
self._backup_list_cache: dict[str, AgentBackup] = {}
self._backup_list_cache_expiration: float = 0.0
self._all_files_cache_lock = asyncio.Lock()
self._backup_list_cache_lock = asyncio.Lock()
def _is_cache_valid(self, expiration_time: float) -> bool:
"""Check if cache is still valid based on expiration time."""
return time() <= expiration_time
async def _cleanup_failed_upload(self, filename: str) -> None:
"""Clean up a partially uploaded file after upload failure."""
_LOGGER.warning(
"Attempting to delete partially uploaded main backup file %s "
"due to metadata upload failure",
filename,
)
try:
uploaded_main_file_info = await self._hass.async_add_executor_job(
self._bucket.get_file_info_by_name, filename
)
await self._hass.async_add_executor_job(uploaded_main_file_info.delete)
except B2Error:
_LOGGER.debug(
"Failed to clean up partially uploaded main backup file %s. "
"Manual intervention may be required to delete it from Backblaze B2",
filename,
exc_info=True,
)
else:
_LOGGER.debug(
"Successfully deleted partially uploaded main backup file %s", filename
)
async def _get_file_for_download(self, backup_id: str) -> FileVersion:
"""Get backup file for download, raising if not found."""
file, _ = await self._find_file_and_metadata_version_by_id(backup_id)
if not file:
raise BackupNotFound(f"Backup {backup_id} not found")
return file
@handle_b2_errors
async def async_download_backup(
self, backup_id: str, **kwargs: Any
) -> AsyncIterator[bytes]:
"""Download a backup from Backblaze B2."""
file = await self._get_file_for_download(backup_id)
_LOGGER.debug("Downloading %s", file.file_name)
downloaded_file = await self._hass.async_add_executor_job(file.download)
response = downloaded_file.response
async def stream_response() -> AsyncIterator[bytes]:
"""Stream the response into an AsyncIterator."""
try:
iterator = response.iter_content(chunk_size=1024 * 1024)
while True:
chunk = await self._hass.async_add_executor_job(
next, iterator, None
)
if chunk is None:
break
yield chunk
finally:
_LOGGER.debug("Finished streaming download for %s", file.file_name)
return stream_response()
@handle_b2_errors
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup to Backblaze B2.
This involves uploading the main backup archive and a separate metadata JSON file.
"""
tar_filename, metadata_filename = suggested_filenames(backup)
prefixed_tar_filename = self._prefix + tar_filename
prefixed_metadata_filename = self._prefix + metadata_filename
metadata_content_bytes = json.dumps(
{
"metadata_version": METADATA_VERSION,
"backup_id": backup.backup_id,
"backup_metadata": backup.as_dict(),
}
).encode("utf-8")
_LOGGER.debug(
"Uploading backup: %s, and metadata: %s",
prefixed_tar_filename,
prefixed_metadata_filename,
)
upload_successful = False
try:
await self._upload_backup_file(prefixed_tar_filename, open_stream, {})
_LOGGER.debug(
"Main backup file upload finished for %s", prefixed_tar_filename
)
_LOGGER.debug("Uploading metadata file: %s", prefixed_metadata_filename)
await self._upload_metadata_file(
metadata_content_bytes, prefixed_metadata_filename
)
_LOGGER.debug(
"Metadata file upload finished for %s", prefixed_metadata_filename
)
upload_successful = True
finally:
if upload_successful:
_LOGGER.debug("Backup upload complete: %s", prefixed_tar_filename)
self._invalidate_caches(
backup.backup_id, prefixed_tar_filename, prefixed_metadata_filename
)
else:
await self._cleanup_failed_upload(prefixed_tar_filename)
def _upload_metadata_file_sync(
self, metadata_content: bytes, filename: str
) -> None:
"""Synchronously upload metadata file to B2."""
self._bucket.upload_bytes(
metadata_content,
filename,
content_type="application/json",
file_info={"metadata_only": "true"},
)
async def _upload_metadata_file(
self, metadata_content: bytes, filename: str
) -> None:
"""Upload metadata file to B2."""
await self._hass.async_add_executor_job(
self._upload_metadata_file_sync,
metadata_content,
filename,
)
def _upload_unbound_stream_sync(
self,
reader: AsyncIteratorReader,
filename: str,
content_type: str,
file_info: dict[str, Any],
) -> FileVersion:
"""Synchronously upload unbound stream to B2."""
return self._bucket.upload_unbound_stream(
reader,
filename,
content_type=content_type,
file_info=file_info,
)
def _download_and_parse_metadata_sync(
self, metadata_file_version: FileVersion
) -> dict[str, Any]:
"""Synchronously download and parse metadata file."""
return _parse_metadata(
metadata_file_version.download().response.content.decode("utf-8")
)
async def _upload_backup_file(
self,
filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
file_info: dict[str, Any],
) -> None:
"""Upload backup file to B2 using streaming."""
_LOGGER.debug("Starting streaming upload for %s", filename)
stream = await open_stream()
reader = AsyncIteratorReader(self._hass.loop, stream)
_LOGGER.debug("Uploading backup file %s with streaming", filename)
try:
content_type, _ = mimetypes.guess_type(filename)
file_version = await self._hass.async_add_executor_job(
self._upload_unbound_stream_sync,
reader,
filename,
content_type or "application/x-tar",
file_info,
)
finally:
reader.close()
_LOGGER.debug("Successfully uploaded %s (ID: %s)", filename, file_version.id_)
@handle_b2_errors
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup and its associated metadata file from Backblaze B2."""
file, metadata_file = await self._find_file_and_metadata_version_by_id(
backup_id
)
if not file:
raise BackupNotFound(f"Backup {backup_id} not found")
# Invariant: when file is not None, metadata_file is also not None
assert metadata_file is not None
_LOGGER.debug(
"Deleting backup file: %s and metadata file: %s",
file.file_name,
metadata_file.file_name,
)
await self._hass.async_add_executor_job(file.delete)
await self._hass.async_add_executor_job(metadata_file.delete)
self._invalidate_caches(
backup_id,
file.file_name,
metadata_file.file_name,
remove_files=True,
)
@handle_b2_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List all backups by finding their associated metadata files in Backblaze B2."""
async with self._backup_list_cache_lock:
if self._backup_list_cache and self._is_cache_valid(
self._backup_list_cache_expiration
):
_LOGGER.debug("Returning backups from cache")
return list(self._backup_list_cache.values())
_LOGGER.debug(
"Cache expired or empty, fetching all files from B2 to build backup list"
)
all_files_in_prefix = await self._get_all_files_in_prefix()
_LOGGER.debug(
"Files found in prefix '%s': %s",
self._prefix,
list(all_files_in_prefix.keys()),
)
# Process metadata files sequentially to avoid exhausting executor pool
backups = {}
for file_name, file_version in all_files_in_prefix.items():
if file_name.endswith(METADATA_FILE_SUFFIX):
backup = await self._hass.async_add_executor_job(
self._process_metadata_file_sync,
file_name,
file_version,
all_files_in_prefix,
)
if backup:
backups[backup.backup_id] = backup
self._backup_list_cache = backups
self._backup_list_cache_expiration = time() + CACHE_TTL
return list(backups.values())
@handle_b2_errors
async def async_get_backup(self, backup_id: str, **kwargs: Any) -> AgentBackup:
"""Get a specific backup by its ID from Backblaze B2."""
if self._backup_list_cache and self._is_cache_valid(
self._backup_list_cache_expiration
):
if backup := self._backup_list_cache.get(backup_id):
_LOGGER.debug("Returning backup %s from cache", backup_id)
return backup
file, metadata_file_version = await self._find_file_and_metadata_version_by_id(
backup_id
)
if not file or not metadata_file_version:
raise BackupNotFound(f"Backup {backup_id} not found")
metadata_content = await self._hass.async_add_executor_job(
self._download_and_parse_metadata_sync,
metadata_file_version,
)
_LOGGER.debug(
"Successfully retrieved metadata for backup ID %s from file %s",
backup_id,
metadata_file_version.file_name,
)
backup = _create_backup_from_metadata(metadata_content, file)
if self._is_cache_valid(self._backup_list_cache_expiration):
self._backup_list_cache[backup.backup_id] = backup
return backup
async def _find_file_and_metadata_version_by_id(
self, backup_id: str
) -> tuple[FileVersion | None, FileVersion | None]:
"""Find the main backup file and its associated metadata file version by backup ID."""
all_files_in_prefix = await self._get_all_files_in_prefix()
# Process metadata files sequentially to avoid exhausting executor pool
for file_name, file_version in all_files_in_prefix.items():
if file_name.endswith(METADATA_FILE_SUFFIX):
(
result_backup_file,
result_metadata_file_version,
) = await self._hass.async_add_executor_job(
self._process_metadata_file_for_id_sync,
file_name,
file_version,
backup_id,
all_files_in_prefix,
)
if result_backup_file and result_metadata_file_version:
return result_backup_file, result_metadata_file_version
_LOGGER.debug("Backup %s not found", backup_id)
return None, None
def _process_metadata_file_for_id_sync(
self,
file_name: str,
file_version: FileVersion,
target_backup_id: str,
all_files_in_prefix: dict[str, FileVersion],
) -> tuple[FileVersion | None, FileVersion | None]:
"""Synchronously process a single metadata file for a specific backup ID.
Called within a thread pool executor.
"""
try:
download_response = file_version.download().response
except B2Error as err:
_LOGGER.warning(
"Failed to download metadata file %s during ID search: %s",
file_name,
err,
)
return None, None
try:
metadata_content = _parse_metadata(
download_response.content.decode("utf-8")
)
except ValueError:
return None, None
if metadata_content["backup_id"] != target_backup_id:
_LOGGER.debug(
"Metadata file %s does not match target backup ID %s",
file_name,
target_backup_id,
)
return None, None
found_backup_file = _find_backup_file_for_metadata(
file_name, all_files_in_prefix, self._prefix
)
if not found_backup_file:
_LOGGER.warning(
"Found metadata file %s for backup ID %s, but no corresponding backup file",
file_name,
target_backup_id,
)
return None, None
_LOGGER.debug(
"Found backup file %s and metadata file %s for ID %s",
found_backup_file.file_name,
file_name,
target_backup_id,
)
return found_backup_file, file_version
async def _get_all_files_in_prefix(self) -> dict[str, FileVersion]:
"""Get all file versions in the configured prefix from Backblaze B2.
Uses a cache to minimize API calls.
This fetches a flat list of all files, including main backups and metadata files.
"""
async with self._all_files_cache_lock:
if self._is_cache_valid(self._all_files_cache_expiration):
_LOGGER.debug("Returning all files from cache")
return self._all_files_cache
_LOGGER.debug("Cache for all files expired or empty, fetching from B2")
all_files_in_prefix = await self._hass.async_add_executor_job(
self._fetch_all_files_in_prefix
)
self._all_files_cache = all_files_in_prefix
self._all_files_cache_expiration = time() + CACHE_TTL
return all_files_in_prefix
def _fetch_all_files_in_prefix(self) -> dict[str, FileVersion]:
"""Fetch all files in the configured prefix from B2."""
all_files: dict[str, FileVersion] = {}
for file, _ in self._bucket.ls(self._prefix):
all_files[file.file_name] = file
return all_files
def _process_metadata_file_sync(
self,
file_name: str,
file_version: FileVersion,
all_files_in_prefix: dict[str, FileVersion],
) -> AgentBackup | None:
"""Synchronously process a single metadata file and return an AgentBackup if valid."""
try:
download_response = file_version.download().response
except B2Error as err:
_LOGGER.warning("Failed to download metadata file %s: %s", file_name, err)
return None
try:
metadata_content = _parse_metadata(
download_response.content.decode("utf-8")
)
except ValueError:
return None
found_backup_file = _find_backup_file_for_metadata(
file_name, all_files_in_prefix, self._prefix
)
if not found_backup_file:
_LOGGER.warning(
"Found metadata file %s but no corresponding backup file",
file_name,
)
return None
_LOGGER.debug(
"Successfully processed metadata file %s for backup ID %s",
file_name,
metadata_content["backup_id"],
)
return _create_backup_from_metadata(metadata_content, found_backup_file)
def _invalidate_caches(
self,
backup_id: str,
tar_filename: str,
metadata_filename: str | None,
*,
remove_files: bool = False,
) -> None:
"""Invalidate caches after upload/deletion operations.
Args:
backup_id: The backup ID to remove from backup cache
tar_filename: The tar filename to remove from files cache
metadata_filename: The metadata filename to remove from files cache
remove_files: If True, remove specific files from cache; if False, expire entire cache
"""
if remove_files:
if self._is_cache_valid(self._all_files_cache_expiration):
self._all_files_cache.pop(tar_filename, None)
if metadata_filename:
self._all_files_cache.pop(metadata_filename, None)
if self._is_cache_valid(self._backup_list_cache_expiration):
self._backup_list_cache.pop(backup_id, None)
else:
# For uploads, we can't easily add new FileVersion objects without API calls,
# so we expire the entire cache for simplicity
self._all_files_cache_expiration = 0.0
self._backup_list_cache_expiration = 0.0

View File

@@ -1,288 +0,0 @@
"""Config flow for the Backblaze B2 integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from b2sdk.v2 import B2Api, InMemoryAccountInfo, exception
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import (
BACKBLAZE_REALM,
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
CONF_PREFIX,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
# Constants
REQUIRED_CAPABILITIES = {"writeFiles", "listFiles", "deleteFiles", "readFiles"}
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Required(CONF_BUCKET): cv.string,
vol.Optional(CONF_PREFIX, default=""): cv.string,
}
)
class BackblazeConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Backblaze B2."""
VERSION = 1
reauth_entry: ConfigEntry[Any] | None
def _abort_if_duplicate_credentials(self, user_input: dict[str, Any]) -> None:
"""Abort if credentials already exist in another entry."""
self._async_abort_entries_match(
{
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
}
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_create_entry(
title=user_input[CONF_BUCKET], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)
async def _async_validate_backblaze_connection(
self, user_input: dict[str, Any]
) -> tuple[dict[str, str], dict[str, str]]:
"""Validate Backblaze B2 credentials, bucket, capabilities, and prefix.
Returns a tuple of (errors_dict, placeholders_dict).
"""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
info = InMemoryAccountInfo()
b2_api = B2Api(info)
def _authorize_and_get_bucket_sync() -> None:
"""Synchronously authorize the account and get the bucket by name.
This function is run in the executor because b2sdk operations are blocking.
"""
b2_api.authorize_account(
BACKBLAZE_REALM, # Use the defined realm constant
user_input[CONF_KEY_ID],
user_input[CONF_APPLICATION_KEY],
)
b2_api.get_bucket_by_name(user_input[CONF_BUCKET])
try:
await self.hass.async_add_executor_job(_authorize_and_get_bucket_sync)
allowed = b2_api.account_info.get_allowed()
# Check if allowed info is available
if allowed is None or not allowed.get("capabilities"):
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(REQUIRED_CAPABILITIES)
)
else:
# Check if all required capabilities are present
current_caps = set(allowed["capabilities"])
if not REQUIRED_CAPABILITIES.issubset(current_caps):
missing_caps = REQUIRED_CAPABILITIES - current_caps
_LOGGER.warning(
"Missing required Backblaze B2 capabilities for Key ID '%s': %s",
user_input[CONF_KEY_ID],
", ".join(sorted(missing_caps)),
)
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(missing_caps)
)
else:
# Only check prefix if capabilities are valid
configured_prefix: str = user_input[CONF_PREFIX]
allowed_prefix = allowed.get("namePrefix") or ""
# Ensure configured prefix starts with Backblaze B2's allowed prefix
if allowed_prefix and not configured_prefix.startswith(
allowed_prefix
):
errors[CONF_PREFIX] = "invalid_prefix"
placeholders["allowed_prefix"] = allowed_prefix
except exception.Unauthorized:
_LOGGER.debug(
"Backblaze B2 authentication failed for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except exception.RestrictedBucket as err:
_LOGGER.debug(
"Access to Backblaze B2 bucket '%s' is restricted: %s",
user_input[CONF_BUCKET],
err,
)
placeholders["restricted_bucket_name"] = err.bucket_name
errors[CONF_BUCKET] = "restricted_bucket"
except exception.NonExistentBucket:
_LOGGER.debug(
"Backblaze B2 bucket '%s' does not exist", user_input[CONF_BUCKET]
)
errors[CONF_BUCKET] = "invalid_bucket_name"
except exception.ConnectionReset:
_LOGGER.error("Failed to connect to Backblaze B2. Connection reset")
errors["base"] = "cannot_connect"
except exception.MissingAccountData:
# This generally indicates an issue with how InMemoryAccountInfo is used
_LOGGER.error(
"Missing account data during Backblaze B2 authorization for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except Exception:
_LOGGER.exception(
"An unexpected error occurred during Backblaze B2 configuration for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "unknown"
return errors, placeholders
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle reauthentication flow."""
self.reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
assert self.reauth_entry is not None
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication."""
assert self.reauth_entry is not None
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
validation_input = {
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
CONF_BUCKET: self.reauth_entry.data[CONF_BUCKET],
CONF_PREFIX: self.reauth_entry.data[CONF_PREFIX],
}
errors, placeholders = await self._async_validate_backblaze_connection(
validation_input
)
if not errors:
return self.async_update_reload_and_abort(
self.reauth_entry,
data_updates={
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
},
)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
}
),
errors=errors,
description_placeholders={
"brand_name": "Backblaze B2",
"bucket": self.reauth_entry.data[CONF_BUCKET],
**placeholders,
},
)
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfiguration flow."""
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry is not None
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_update_reload_and_abort(
entry,
data_updates=user_input,
)
else:
errors = {}
placeholders = {}
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input or entry.data
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)

View File

@@ -1,22 +0,0 @@
"""Constants for the Backblaze B2 integration."""
from collections.abc import Callable
from typing import Final
from homeassistant.util.hass_dict import HassKey
DOMAIN: Final = "backblaze_b2"
CONF_KEY_ID = "key_id"
CONF_APPLICATION_KEY = "application_key"
CONF_BUCKET = "bucket"
CONF_PREFIX = "prefix"
DATA_BACKUP_AGENT_LISTENERS: HassKey[list[Callable[[], None]]] = HassKey(
f"{DOMAIN}.backup_agent_listeners"
)
METADATA_FILE_SUFFIX = ".metadata.json"
METADATA_VERSION = "1"
BACKBLAZE_REALM = "production"

View File

@@ -1,56 +0,0 @@
"""Diagnostics support for Backblaze B2."""
from __future__ import annotations
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.core import HomeAssistant
from . import BackblazeConfigEntry
from .const import CONF_APPLICATION_KEY, CONF_KEY_ID
TO_REDACT_ENTRY_DATA = {CONF_APPLICATION_KEY, CONF_KEY_ID}
TO_REDACT_ACCOUNT_DATA_ALLOWED = {"bucketId", "bucketName", "namePrefix"}
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: BackblazeConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
bucket = entry.runtime_data
try:
bucket_info = {
"name": bucket.name,
"id": bucket.id_,
"type": bucket.type_,
"cors_rules": bucket.cors_rules,
"lifecycle_rules": bucket.lifecycle_rules,
"revision": bucket.revision,
}
account_info = bucket.api.account_info
account_data: dict[str, Any] = {
"account_id": account_info.get_account_id(),
"api_url": account_info.get_api_url(),
"download_url": account_info.get_download_url(),
"minimum_part_size": account_info.get_minimum_part_size(),
"allowed": account_info.get_allowed(),
}
if isinstance(account_data["allowed"], dict):
account_data["allowed"] = async_redact_data(
account_data["allowed"], TO_REDACT_ACCOUNT_DATA_ALLOWED
)
except (AttributeError, TypeError, ValueError, KeyError):
bucket_info = {"name": "unknown", "id": "unknown"}
account_data = {"error": "Failed to retrieve detailed account information"}
return {
"entry_data": async_redact_data(entry.data, TO_REDACT_ENTRY_DATA),
"entry_options": entry.options,
"bucket_info": bucket_info,
"account_info": account_data,
}

View File

@@ -1,12 +0,0 @@
{
"domain": "backblaze_b2",
"name": "Backblaze B2",
"codeowners": ["@hugo-vrijswijk", "@ElCruncharino"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/backblaze_b2",
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["b2sdk"],
"quality_scale": "bronze",
"requirements": ["b2sdk==2.8.1"]
}

View File

@@ -1,124 +0,0 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling:
status: exempt
comment: Integration does not poll.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: This integration does not have any custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Entities of this integration do not explicitly subscribe to events.
entity-unique-id:
status: exempt
comment: |
This integration does not have entities.
has-entity-name:
status: exempt
comment: |
This integration does not have entities.
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register custom actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: This integration does not have an options flow.
docs-installation-parameters: done
entity-unavailable:
status: exempt
comment: This integration does not have entities.
integration-owner: done
log-when-unavailable:
status: exempt
comment: This integration does not have entities.
parallel-updates:
status: exempt
comment: This integration does not poll.
reauthentication-flow: done
test-coverage: done
# Gold
devices:
status: exempt
comment: This integration does not have entities.
diagnostics: done
discovery-update-info:
status: exempt
comment: Backblaze B2 is a cloud service that is not discovered on the network.
discovery:
status: exempt
comment: Backblaze B2 is a cloud service that is not discovered on the network.
docs-data-update:
status: exempt
comment: This integration does not poll.
docs-examples:
status: exempt
comment: The integration extends core functionality and does not require examples.
docs-known-limitations: done
docs-supported-devices:
status: exempt
comment: This integration does not support physical devices.
docs-supported-functions:
status: exempt
comment: This integration does not have entities.
docs-troubleshooting: todo
docs-use-cases: done
dynamic-devices:
status: exempt
comment: This integration does not have devices.
entity-category:
status: exempt
comment: This integration does not have entities.
entity-device-class:
status: exempt
comment: This integration does not have entities.
entity-disabled-by-default:
status: exempt
comment: This integration does not have entities.
entity-translations:
status: exempt
comment: This integration does not have entities.
exception-translations: done
icon-translations:
status: exempt
comment: This integration does not use icons.
reconfiguration-flow: done
repair-issues: done
stale-devices:
status: exempt
comment: This integration does not have devices.
# Platinum
async-dependency:
status: exempt
comment: |
The b2sdk library is synchronous by design. All sync operations are properly
wrapped with async_add_executor_job to prevent blocking the event loop.
inject-websession:
status: exempt
comment: |
The b2sdk library does not support custom HTTP session injection.
It manages HTTP connections internally through its own session management.
strict-typing:
status: exempt
comment: |
The b2sdk dependency does not include a py.typed file and is not PEP 561 compliant.
This is outside the integration's control as it's a third-party library requirement.

View File

@@ -1,93 +0,0 @@
"""Repair issues for the Backblaze B2 integration."""
from __future__ import annotations
import logging
from b2sdk.v2.exception import (
B2Error,
NonExistentBucket,
RestrictedBucket,
Unauthorized,
)
from homeassistant.components.repairs import ConfirmRepairFlow
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import issue_registry as ir
from .const import CONF_BUCKET, DOMAIN
_LOGGER = logging.getLogger(__name__)
ISSUE_BUCKET_ACCESS_RESTRICTED = "bucket_access_restricted"
ISSUE_BUCKET_NOT_FOUND = "bucket_not_found"
def _create_issue(
hass: HomeAssistant,
entry: ConfigEntry,
issue_type: str,
bucket_name: str,
) -> None:
"""Create a repair issue with standard parameters."""
ir.async_create_issue(
hass,
DOMAIN,
f"{issue_type}_{entry.entry_id}",
is_fixable=False,
issue_domain=DOMAIN,
severity=ir.IssueSeverity.ERROR,
translation_key=issue_type,
translation_placeholders={
"brand_name": "Backblaze B2",
"title": entry.title,
"bucket_name": bucket_name,
"entry_id": entry.entry_id,
},
)
def create_bucket_access_restricted_issue(
hass: HomeAssistant, entry: ConfigEntry, bucket_name: str
) -> None:
"""Create a repair issue for restricted bucket access."""
_create_issue(hass, entry, ISSUE_BUCKET_ACCESS_RESTRICTED, bucket_name)
def create_bucket_not_found_issue(
hass: HomeAssistant, entry: ConfigEntry, bucket_name: str
) -> None:
"""Create a repair issue for non-existent bucket."""
_create_issue(hass, entry, ISSUE_BUCKET_NOT_FOUND, bucket_name)
async def async_check_for_repair_issues(
hass: HomeAssistant, entry: ConfigEntry
) -> None:
"""Check for common issues that require user action."""
bucket = entry.runtime_data
restricted_issue_id = f"{ISSUE_BUCKET_ACCESS_RESTRICTED}_{entry.entry_id}"
not_found_issue_id = f"{ISSUE_BUCKET_NOT_FOUND}_{entry.entry_id}"
try:
await hass.async_add_executor_job(bucket.api.account_info.get_allowed)
ir.async_delete_issue(hass, DOMAIN, restricted_issue_id)
ir.async_delete_issue(hass, DOMAIN, not_found_issue_id)
except Unauthorized:
entry.async_start_reauth(hass)
except RestrictedBucket as err:
_create_issue(hass, entry, ISSUE_BUCKET_ACCESS_RESTRICTED, err.bucket_name)
except NonExistentBucket:
_create_issue(hass, entry, ISSUE_BUCKET_NOT_FOUND, entry.data[CONF_BUCKET])
except B2Error as err:
_LOGGER.debug("B2 connectivity test failed: %s", err)
async def async_create_fix_flow(
hass: HomeAssistant,
issue_id: str,
data: dict[str, str | int | float | None] | None,
) -> ConfirmRepairFlow:
"""Create a fix flow for Backblaze B2 issues."""
return ConfirmRepairFlow()

View File

@@ -1,92 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_bucket_name": "[%key:component::backblaze_b2::exceptions::invalid_bucket_name::message%]",
"invalid_capability": "[%key:component::backblaze_b2::exceptions::invalid_capability::message%]",
"invalid_credentials": "[%key:component::backblaze_b2::exceptions::invalid_credentials::message%]",
"invalid_prefix": "[%key:component::backblaze_b2::exceptions::invalid_prefix::message%]",
"restricted_bucket": "[%key:component::backblaze_b2::exceptions::restricted_bucket::message%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"reauth_confirm": {
"data": {
"application_key": "Application key",
"key_id": "Key ID"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"key_id": "Key ID to connect to {brand_name}"
},
"description": "Update your {brand_name} credentials for bucket {bucket}.",
"title": "Reauthenticate {brand_name}"
},
"reconfigure": {
"data": {
"application_key": "Application key",
"bucket": "Bucket name",
"key_id": "Key ID",
"prefix": "Folder prefix (optional)"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"bucket": "Bucket must already exist and be writable by the provided credentials.",
"key_id": "Key ID to connect to {brand_name}",
"prefix": "Directory path to store backup files in. Leave empty to store in the root."
},
"title": "Reconfigure {brand_name}"
},
"user": {
"data": {
"application_key": "Application key",
"bucket": "Bucket name",
"key_id": "Key ID",
"prefix": "Folder prefix (optional)"
},
"data_description": {
"application_key": "Application key to connect to {brand_name}",
"bucket": "Bucket must already exist and be writable by the provided credentials.",
"key_id": "Key ID to connect to {brand_name}",
"prefix": "Directory path to store backup files in. Leave empty to store in the root."
},
"title": "Add {brand_name} backup"
}
}
},
"exceptions": {
"cannot_connect": {
"message": "Cannot connect to endpoint"
},
"invalid_bucket_name": {
"message": "Bucket does not exist or is not writable by the provided credentials."
},
"invalid_capability": {
"message": "Application key does not have the required read/write capabilities."
},
"invalid_credentials": {
"message": "Bucket cannot be accessed using provided of key ID and application key."
},
"invalid_prefix": {
"message": "Prefix is not allowed for provided key. Must start with {allowed_prefix}."
},
"restricted_bucket": {
"message": "Application key is restricted to bucket {restricted_bucket_name}."
}
},
"issues": {
"bucket_access_restricted": {
"description": "Access to your {brand_name} bucket {bucket_name} is restricted for the current credentials. This means your application key may only have access to specific buckets, but not this one. To fix this issue:\n\n1. Log in to your {brand_name} account\n2. Check your application key restrictions\n3. Either use a different bucket that your key can access, or create a new application key with access to {bucket_name}\n4. Go to Settings > Devices & Services > {brand_name} and reconfigure the integration settings\n\nOnce you update the integration settings, this issue will be automatically resolved.",
"title": "{brand_name} bucket access restricted"
},
"bucket_not_found": {
"description": "The {brand_name} bucket {bucket_name} cannot be found or accessed. This could mean:\n\n1. The bucket was deleted\n2. The bucket name was changed\n3. Your credentials no longer have access to this bucket\n\nTo fix this issue:\n\n1. Log in to your {brand_name} account\n2. Verify the bucket still exists and check its name\n3. Ensure your application key has access to this bucket\n4. Go to Settings > Devices & Services > {brand_name} and reconfigure the integration settings\n\nOnce you update the integration settings, this issue will be automatically resolved.",
"title": "{brand_name} bucket not found"
}
}
}

View File

@@ -1,9 +1,7 @@
"""The blueprint integration."""
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.typing import ConfigType
from . import websocket_api
@@ -30,7 +28,4 @@ CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the blueprint integration."""
websocket_api.async_setup(hass)
hass.async_create_task(
async_load_platform(hass, Platform.UPDATE, DOMAIN, None, config)
)
return True

View File

@@ -204,8 +204,8 @@ class DomainBlueprints:
self.hass = hass
self.domain = domain
self.logger = logger
self.blueprint_in_use = blueprint_in_use
self.reload_blueprint_consumers = reload_blueprint_consumers
self._blueprint_in_use = blueprint_in_use
self._reload_blueprint_consumers = reload_blueprint_consumers
self._blueprints: dict[str, Blueprint | None] = {}
self._load_lock = asyncio.Lock()
self._blueprint_schema = blueprint_schema
@@ -325,7 +325,7 @@ class DomainBlueprints:
async def async_remove_blueprint(self, blueprint_path: str) -> None:
"""Remove a blueprint file."""
if self.blueprint_in_use(self.hass, blueprint_path):
if self._blueprint_in_use(self.hass, blueprint_path):
raise BlueprintInUse(self.domain, blueprint_path)
path = self.blueprint_folder / blueprint_path
await self.hass.async_add_executor_job(path.unlink)
@@ -362,7 +362,7 @@ class DomainBlueprints:
self._blueprints[blueprint_path] = blueprint
if overrides_existing:
await self.reload_blueprint_consumers(self.hass, blueprint_path)
await self._reload_blueprint_consumers(self.hass, blueprint_path)
return overrides_existing

View File

@@ -1,293 +0,0 @@
"""Update entities for blueprints."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import logging
from datetime import timedelta
from typing import Any, Final
from homeassistant.components import automation, script
from . import importer, models
from homeassistant.components.update import UpdateEntity, UpdateEntityFeature
from homeassistant.const import CONF_SOURCE_URL
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import event as event_helper
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import DOMAIN as BLUEPRINT_DOMAIN
from .errors import BlueprintException
_LOGGER = logging.getLogger(__name__)
_LATEST_VERSION_PLACEHOLDER: Final = "remote"
DATA_UPDATE_MANAGER: Final = "update_manager"
REFRESH_INTERVAL: Final = timedelta(days=1)
@dataclass(slots=True)
class BlueprintUsage:
"""Details about a blueprint currently in use."""
domain: str
path: str
domain_blueprints: models.DomainBlueprints
blueprint: models.Blueprint
entities: list[str]
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the blueprint update platform."""
data = hass.data.setdefault(BLUEPRINT_DOMAIN, {})
if (manager := data.get(DATA_UPDATE_MANAGER)) is None:
manager = BlueprintUpdateManager(hass, async_add_entities)
data[DATA_UPDATE_MANAGER] = manager
await manager.async_start()
return
manager.replace_add_entities(async_add_entities)
await manager.async_recreate_entities()
class BlueprintUpdateManager:
"""Manage blueprint update entities based on blueprint usage."""
def __init__(
self, hass: HomeAssistant, async_add_entities: AddEntitiesCallback
) -> None:
"""Initialize the manager."""
self.hass = hass
self._async_add_entities = async_add_entities
self._entities: dict[tuple[str, str], BlueprintUpdateEntity] = {}
self._lock = asyncio.Lock()
self._refresh_cancel: CALLBACK_TYPE | None = None
self._started = False
self._interval_unsub: CALLBACK_TYPE | None = None
async def async_start(self) -> None:
"""Start tracking blueprint usage."""
if self._started:
return
self._started = True
self._interval_unsub = event_helper.async_track_time_interval(
self.hass, self._handle_time_interval, REFRESH_INTERVAL
)
await self.async_refresh_entities()
def replace_add_entities(self, async_add_entities: AddEntitiesCallback) -> None:
"""Update the callback used to register entities."""
self._async_add_entities = async_add_entities
async def async_recreate_entities(self) -> None:
"""Recreate entities after the platform has been reloaded."""
async with self._lock:
entities = list(self._entities.values())
self._entities.clear()
for entity in entities:
await entity.async_remove()
await self.async_refresh_entities()
async def async_refresh_entities(self) -> None:
"""Refresh update entities based on current blueprint usage."""
async with self._lock:
usage_map = await self._async_collect_in_use_blueprints()
current_keys = set(self._entities)
new_keys = set(usage_map)
for key in current_keys - new_keys:
entity = self._entities.pop(key)
await entity.async_remove()
new_entities: list[BlueprintUpdateEntity] = []
for key in new_keys - current_keys:
usage = usage_map[key]
entity = BlueprintUpdateEntity(self, usage)
self._entities[key] = entity
new_entities.append(entity)
for key in new_keys & current_keys:
self._entities[key].update_usage(usage_map[key])
self._entities[key].async_write_ha_state()
if new_entities:
self._async_add_entities(new_entities)
def async_schedule_refresh(self) -> None:
"""Schedule an asynchronous refresh."""
if self._refresh_cancel is not None:
return
self._refresh_cancel = event_helper.async_call_later(
self.hass, 0, self._handle_scheduled_refresh
)
@callback
def _handle_scheduled_refresh(self, _now: Any) -> None:
"""Run a scheduled refresh task."""
self._refresh_cancel = None
self.hass.async_create_task(self.async_refresh_entities())
@callback
def _handle_time_interval(self, _now: Any) -> None:
"""Handle scheduled interval refresh."""
self.async_schedule_refresh()
async def _async_collect_in_use_blueprints(self) -> dict[tuple[str, str], BlueprintUsage]:
"""Collect blueprint usage information for automations and scripts."""
usage_keys: set[tuple[str, str]] = set()
if automation.DATA_COMPONENT in self.hass.data:
component = self.hass.data[automation.DATA_COMPONENT]
for automation_entity in list(component.entities):
if (path := getattr(automation_entity, "referenced_blueprint", None)):
usage_keys.add((automation.DOMAIN, path))
if script.DOMAIN in self.hass.data:
component = self.hass.data[script.DOMAIN]
for script_entity in list(component.entities):
if (path := getattr(script_entity, "referenced_blueprint", None)):
usage_keys.add((script.DOMAIN, path))
domain_blueprints_map = self.hass.data.get(BLUEPRINT_DOMAIN, {})
usage_map: dict[tuple[str, str], BlueprintUsage] = {}
for domain, path in usage_keys:
domain_blueprints: models.DomainBlueprints | None = domain_blueprints_map.get(
domain
)
if domain_blueprints is None:
continue
if not domain_blueprints.blueprint_in_use(self.hass, path):
continue
try:
blueprint = await domain_blueprints.async_get_blueprint(path)
except BlueprintException:
continue
source_url = blueprint.metadata.get(CONF_SOURCE_URL)
if not source_url:
continue
if domain == automation.DOMAIN:
entities = automation.automations_with_blueprint(self.hass, path)
elif domain == script.DOMAIN:
entities = script.scripts_with_blueprint(self.hass, path)
else:
entities = []
usage_map[(domain, path)] = BlueprintUsage(
domain=domain,
path=path,
domain_blueprints=domain_blueprints,
blueprint=blueprint,
entities=entities,
)
return usage_map
class BlueprintUpdateEntity(UpdateEntity):
"""Define a blueprint update entity."""
_attr_entity_category = EntityCategory.CONFIG
_attr_has_entity_name = True
_attr_should_poll = False
_attr_supported_features = UpdateEntityFeature.INSTALL
def __init__(self, manager: BlueprintUpdateManager, usage: BlueprintUsage) -> None:
"""Initialize the update entity."""
self._manager = manager
self._domain = usage.domain
self._path = usage.path
self._domain_blueprints = usage.domain_blueprints
self._blueprint = usage.blueprint
self._entities_in_use = usage.entities
self._source_url = usage.blueprint.metadata.get(CONF_SOURCE_URL)
self._attr_unique_id = f"{self._domain}:{self._path}"
self._attr_in_progress = False
self.update_usage(usage)
@callback
def update_usage(self, usage: BlueprintUsage) -> None:
"""Update the entity with latest usage information."""
self._domain_blueprints = usage.domain_blueprints
self._blueprint = usage.blueprint
self._entities_in_use = usage.entities
self._source_url = usage.blueprint.metadata.get(CONF_SOURCE_URL)
self._attr_name = usage.blueprint.name
self._attr_release_summary = usage.blueprint.metadata.get("description")
self._attr_installed_version = usage.blueprint.metadata.get("version")
self._attr_release_url = self._source_url
self._attr_available = self._source_url is not None
self._attr_latest_version = (
_LATEST_VERSION_PLACEHOLDER
if self._source_url is not None
else self._attr_installed_version
)
async def async_install(self, version: str | None, backup: bool) -> None:
"""Install (refresh) the blueprint from its source."""
if self._source_url is None:
raise HomeAssistantError("Blueprint does not define a source URL")
self._attr_in_progress = True
self.async_write_ha_state()
usage: BlueprintUsage | None = None
try:
imported = await importer.fetch_blueprint_from_url(
self.hass, self._source_url
)
blueprint = imported.blueprint
if blueprint.domain != self._domain:
raise HomeAssistantError(
"Downloaded blueprint domain does not match the existing blueprint"
)
await self._domain_blueprints.async_add_blueprint(
blueprint, self._path, allow_override=True
)
usage = BlueprintUsage(
domain=self._domain,
path=self._path,
domain_blueprints=self._domain_blueprints,
blueprint=blueprint,
entities=self._entities_in_use,
)
except HomeAssistantError:
raise
except Exception as err: # noqa: BLE001 - Provide context for unexpected errors
raise HomeAssistantError("Failed to update blueprint from source") from err
finally:
self._attr_in_progress = False
if usage is not None:
self.update_usage(usage)
self.async_write_ha_state()
self._manager.async_schedule_refresh()

View File

@@ -189,7 +189,7 @@ class BryantEvolutionClimate(ClimateEntity):
return HVACAction.HEATING
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="failed_to_parse_hvac_mode",
translation_key="failed_to_parse_hvac_action",
translation_placeholders={
"mode_and_active": mode_and_active,
"current_temperature": str(self.current_temperature),

View File

@@ -24,7 +24,7 @@
},
"exceptions": {
"failed_to_parse_hvac_action": {
"message": "Could not determine HVAC action: {mode_and_active}, {self.current_temperature}, {self.target_temperature_low}"
"message": "Could not determine HVAC action: {mode_and_active}, {current_temperature}, {target_temperature_low}"
},
"failed_to_parse_hvac_mode": {
"message": "Cannot parse response to HVACMode: {mode}"

View File

@@ -0,0 +1,4 @@
"""The cups component."""
DOMAIN = "cups"
CONF_PRINTERS = "printers"

View File

@@ -0,0 +1,9 @@
{
"domain": "cups",
"name": "CUPS",
"codeowners": ["@fabaff"],
"documentation": "https://www.home-assistant.io/integrations/cups",
"iot_class": "local_polling",
"quality_scale": "legacy",
"requirements": ["pycups==2.0.4"]
}

View File

@@ -0,0 +1,349 @@
"""Details about printers which are connected to CUPS."""
from __future__ import annotations
from datetime import timedelta
import importlib
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components.sensor import (
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.const import CONF_HOST, CONF_PORT, PERCENTAGE
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import CONF_PRINTERS, DOMAIN
_LOGGER = logging.getLogger(__name__)
ATTR_MARKER_TYPE = "marker_type"
ATTR_MARKER_LOW_LEVEL = "marker_low_level"
ATTR_MARKER_HIGH_LEVEL = "marker_high_level"
ATTR_PRINTER_NAME = "printer_name"
ATTR_DEVICE_URI = "device_uri"
ATTR_PRINTER_INFO = "printer_info"
ATTR_PRINTER_IS_SHARED = "printer_is_shared"
ATTR_PRINTER_LOCATION = "printer_location"
ATTR_PRINTER_MODEL = "printer_model"
ATTR_PRINTER_STATE_MESSAGE = "printer_state_message"
ATTR_PRINTER_STATE_REASON = "printer_state_reason"
ATTR_PRINTER_TYPE = "printer_type"
ATTR_PRINTER_URI_SUPPORTED = "printer_uri_supported"
CONF_IS_CUPS_SERVER = "is_cups_server"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 631
DEFAULT_IS_CUPS_SERVER = True
ICON_PRINTER = "mdi:printer"
ICON_MARKER = "mdi:water"
SCAN_INTERVAL = timedelta(minutes=1)
PRINTER_STATES = {3: "idle", 4: "printing", 5: "stopped"}
PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_PRINTERS): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_IS_CUPS_SERVER, default=DEFAULT_IS_CUPS_SERVER): cv.boolean,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the CUPS sensor."""
host: str = config[CONF_HOST]
port: int = config[CONF_PORT]
printers: list[str] = config[CONF_PRINTERS]
is_cups: bool = config[CONF_IS_CUPS_SERVER]
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "CUPS",
},
)
if is_cups:
data = CupsData(host, port, None)
data.update()
if data.available is False:
_LOGGER.error("Unable to connect to CUPS server: %s:%s", host, port)
raise PlatformNotReady
assert data.printers is not None
dev: list[SensorEntity] = []
for printer in printers:
if printer not in data.printers:
_LOGGER.error("Printer is not present: %s", printer)
continue
dev.append(CupsSensor(data, printer))
if "marker-names" in data.attributes[printer]:
dev.extend(
MarkerSensor(data, printer, marker, True)
for marker in data.attributes[printer]["marker-names"]
)
add_entities(dev, True)
return
data = CupsData(host, port, printers)
data.update()
if data.available is False:
_LOGGER.error("Unable to connect to IPP printer: %s:%s", host, port)
raise PlatformNotReady
dev = []
for printer in printers:
dev.append(IPPSensor(data, printer))
if "marker-names" in data.attributes[printer]:
for marker in data.attributes[printer]["marker-names"]:
dev.append(MarkerSensor(data, printer, marker, False))
add_entities(dev, True)
class CupsSensor(SensorEntity):
"""Representation of a CUPS sensor."""
_attr_icon = ICON_PRINTER
def __init__(self, data: CupsData, printer_name: str) -> None:
"""Initialize the CUPS sensor."""
self.data = data
self._name = printer_name
self._printer: dict[str, Any] | None = None
self._attr_available = False
@property
def name(self) -> str:
"""Return the name of the entity."""
return self._name
@property
def native_value(self):
"""Return the state of the sensor."""
if self._printer is None:
return None
key = self._printer["printer-state"]
return PRINTER_STATES.get(key, key)
@property
def extra_state_attributes(self):
"""Return the state attributes of the sensor."""
if self._printer is None:
return None
return {
ATTR_DEVICE_URI: self._printer["device-uri"],
ATTR_PRINTER_INFO: self._printer["printer-info"],
ATTR_PRINTER_IS_SHARED: self._printer["printer-is-shared"],
ATTR_PRINTER_LOCATION: self._printer["printer-location"],
ATTR_PRINTER_MODEL: self._printer["printer-make-and-model"],
ATTR_PRINTER_STATE_MESSAGE: self._printer["printer-state-message"],
ATTR_PRINTER_STATE_REASON: self._printer["printer-state-reasons"],
ATTR_PRINTER_TYPE: self._printer["printer-type"],
ATTR_PRINTER_URI_SUPPORTED: self._printer["printer-uri-supported"],
}
def update(self) -> None:
"""Get the latest data and updates the states."""
self.data.update()
assert self.data.printers is not None
self._printer = self.data.printers.get(self.name)
self._attr_available = self.data.available
class IPPSensor(SensorEntity):
"""Implementation of the IPPSensor.
This sensor represents the status of the printer.
"""
_attr_icon = ICON_PRINTER
def __init__(self, data: CupsData, printer_name: str) -> None:
"""Initialize the sensor."""
self.data = data
self._printer_name = printer_name
self._attributes = None
self._attr_available = False
@property
def name(self):
"""Return the name of the sensor."""
return self._attributes["printer-make-and-model"]
@property
def native_value(self):
"""Return the state of the sensor."""
if self._attributes is None:
return None
key = self._attributes["printer-state"]
return PRINTER_STATES.get(key, key)
@property
def extra_state_attributes(self):
"""Return the state attributes of the sensor."""
if self._attributes is None:
return None
state_attributes = {}
if "printer-info" in self._attributes:
state_attributes[ATTR_PRINTER_INFO] = self._attributes["printer-info"]
if "printer-location" in self._attributes:
state_attributes[ATTR_PRINTER_LOCATION] = self._attributes[
"printer-location"
]
if "printer-state-message" in self._attributes:
state_attributes[ATTR_PRINTER_STATE_MESSAGE] = self._attributes[
"printer-state-message"
]
if "printer-state-reasons" in self._attributes:
state_attributes[ATTR_PRINTER_STATE_REASON] = self._attributes[
"printer-state-reasons"
]
if "printer-uri-supported" in self._attributes:
state_attributes[ATTR_PRINTER_URI_SUPPORTED] = self._attributes[
"printer-uri-supported"
]
return state_attributes
def update(self) -> None:
"""Fetch new state data for the sensor."""
self.data.update()
self._attributes = self.data.attributes.get(self._printer_name)
self._attr_available = self.data.available
class MarkerSensor(SensorEntity):
"""Implementation of the MarkerSensor.
This sensor represents the percentage of ink or toner.
"""
_attr_icon = ICON_MARKER
_attr_native_unit_of_measurement = PERCENTAGE
def __init__(self, data: CupsData, printer: str, name: str, is_cups: bool) -> None:
"""Initialize the sensor."""
self.data = data
self._attr_name = name
self._printer = printer
self._index = data.attributes[printer]["marker-names"].index(name)
self._is_cups = is_cups
self._attributes: dict[str, Any] | None = None
@property
def native_value(self):
"""Return the state of the sensor."""
if self._attributes is None:
return None
return self._attributes[self._printer]["marker-levels"][self._index]
@property
def extra_state_attributes(self):
"""Return the state attributes of the sensor."""
if self._attributes is None:
return None
high_level = self._attributes[self._printer].get("marker-high-levels")
if isinstance(high_level, list):
high_level = high_level[self._index]
low_level = self._attributes[self._printer].get("marker-low-levels")
if isinstance(low_level, list):
low_level = low_level[self._index]
marker_types = self._attributes[self._printer]["marker-types"]
if isinstance(marker_types, list):
marker_types = marker_types[self._index]
if self._is_cups:
printer_name = self._printer
else:
printer_name = self._attributes[self._printer]["printer-make-and-model"]
return {
ATTR_MARKER_HIGH_LEVEL: high_level,
ATTR_MARKER_LOW_LEVEL: low_level,
ATTR_MARKER_TYPE: marker_types,
ATTR_PRINTER_NAME: printer_name,
}
def update(self) -> None:
"""Update the state of the sensor."""
# Data fetching is done by CupsSensor/IPPSensor
self._attributes = self.data.attributes
class CupsData:
"""Get the latest data from CUPS and update the state."""
def __init__(self, host: str, port: int, ipp_printers: list[str] | None) -> None:
"""Initialize the data object."""
self._host = host
self._port = port
self._ipp_printers = ipp_printers
self.is_cups = ipp_printers is None
self.printers: dict[str, dict[str, Any]] | None = None
self.attributes: dict[str, Any] = {}
self.available = False
def update(self) -> None:
"""Get the latest data from CUPS."""
cups = importlib.import_module("cups")
try:
conn = cups.Connection(host=self._host, port=self._port)
if self.is_cups:
self.printers = conn.getPrinters()
assert self.printers is not None
for printer in self.printers:
self.attributes[printer] = conn.getPrinterAttributes(name=printer)
else:
assert self._ipp_printers is not None
for ipp_printer in self._ipp_printers:
self.attributes[ipp_printer] = conn.getPrinterAttributes(
uri=f"ipp://{self._host}:{self._port}/{ipp_printer}"
)
self.available = True
except RuntimeError:
self.available = False

View File

@@ -0,0 +1,3 @@
"""The decora component."""
DOMAIN = "decora"

View File

@@ -0,0 +1,166 @@
"""Support for Decora dimmers."""
from __future__ import annotations
from collections.abc import Callable
import copy
from functools import wraps
import logging
import time
from typing import TYPE_CHECKING, Any, Concatenate
from bluepy.btle import BTLEException
import decora
import voluptuous as vol
from homeassistant import util
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
PLATFORM_SCHEMA as LIGHT_PLATFORM_SCHEMA,
ColorMode,
LightEntity,
)
from homeassistant.const import CONF_API_KEY, CONF_DEVICES, CONF_NAME
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from . import DOMAIN
if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
_LOGGER = logging.getLogger(__name__)
def _name_validator(config):
"""Validate the name."""
config = copy.deepcopy(config)
for address, device_config in config[CONF_DEVICES].items():
if CONF_NAME not in device_config:
device_config[CONF_NAME] = util.slugify(address)
return config
DEVICE_SCHEMA = vol.Schema(
{vol.Optional(CONF_NAME): cv.string, vol.Required(CONF_API_KEY): cv.string}
)
PLATFORM_SCHEMA = vol.Schema(
vol.All(
LIGHT_PLATFORM_SCHEMA.extend(
{vol.Optional(CONF_DEVICES, default={}): {cv.string: DEVICE_SCHEMA}}
),
_name_validator,
)
)
def retry[_DecoraLightT: DecoraLight, **_P, _R](
method: Callable[Concatenate[_DecoraLightT, _P], _R],
) -> Callable[Concatenate[_DecoraLightT, _P], _R | None]:
"""Retry bluetooth commands."""
@wraps(method)
def wrapper_retry(
device: _DecoraLightT, *args: _P.args, **kwargs: _P.kwargs
) -> _R | None:
"""Try send command and retry on error."""
initial = time.monotonic()
while True:
if time.monotonic() - initial >= 10:
return None
try:
return method(device, *args, **kwargs)
except (decora.decoraException, AttributeError, BTLEException):
_LOGGER.warning(
"Decora connect error for device %s. Reconnecting",
device.name,
)
device._switch.connect() # noqa: SLF001
return wrapper_retry
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up an Decora switch."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Leviton Decora",
},
)
lights = []
for address, device_config in config[CONF_DEVICES].items():
device = {}
device["name"] = device_config[CONF_NAME]
device["key"] = device_config[CONF_API_KEY]
device["address"] = address
light = DecoraLight(device)
lights.append(light)
add_entities(lights)
class DecoraLight(LightEntity):
"""Representation of an Decora light."""
_attr_color_mode = ColorMode.BRIGHTNESS
_attr_supported_color_modes = {ColorMode.BRIGHTNESS}
def __init__(self, device: dict[str, Any]) -> None:
"""Initialize the light."""
self._attr_name = device["name"]
self._attr_unique_id = device["address"]
self._key = device["key"]
self._switch = decora.decora(device["address"], self._key)
self._attr_brightness = 0
self._attr_is_on = False
@retry
def set_state(self, brightness: int) -> None:
"""Set the state of this lamp to the provided brightness."""
self._switch.set_brightness(int(brightness / 2.55))
self._attr_brightness = brightness
@retry
def turn_on(self, **kwargs: Any) -> None:
"""Turn the specified or all lights on."""
brightness = kwargs.get(ATTR_BRIGHTNESS)
self._switch.on()
self._attr_is_on = True
if brightness is not None:
self.set_state(brightness)
@retry
def turn_off(self, **kwargs: Any) -> None:
"""Turn the specified or all lights off."""
self._switch.off()
self._attr_is_on = False
@retry
def update(self) -> None:
"""Synchronise internal state with the actual light state."""
self._attr_brightness = self._switch.get_brightness() * 2.55
self._attr_is_on = self._switch.get_on()

View File

@@ -0,0 +1,10 @@
{
"domain": "decora",
"name": "Leviton Decora",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/decora",
"iot_class": "local_polling",
"loggers": ["bluepy", "decora"],
"quality_scale": "legacy",
"requirements": ["bluepy==1.3.0", "decora==0.6"]
}

View File

@@ -0,0 +1,3 @@
"""The dlib_face_detect component."""
DOMAIN = "dlib_face_detect"

View File

@@ -0,0 +1,82 @@
"""Component that will help set the Dlib face detect processing."""
from __future__ import annotations
import io
import face_recognition
from homeassistant.components.image_processing import (
PLATFORM_SCHEMA as IMAGE_PROCESSING_PLATFORM_SCHEMA,
ImageProcessingFaceEntity,
)
from homeassistant.const import ATTR_LOCATION, CONF_ENTITY_ID, CONF_NAME, CONF_SOURCE
from homeassistant.core import (
DOMAIN as HOMEASSISTANT_DOMAIN,
HomeAssistant,
split_entity_id,
)
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import DOMAIN
PLATFORM_SCHEMA = IMAGE_PROCESSING_PLATFORM_SCHEMA
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Dlib Face detection platform."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Dlib Face Detect",
},
)
source: list[dict[str, str]] = config[CONF_SOURCE]
add_entities(
DlibFaceDetectEntity(camera[CONF_ENTITY_ID], camera.get(CONF_NAME))
for camera in source
)
class DlibFaceDetectEntity(ImageProcessingFaceEntity):
"""Dlib Face API entity for identify."""
def __init__(self, camera_entity: str, name: str | None) -> None:
"""Initialize Dlib face entity."""
super().__init__()
self._attr_camera_entity = camera_entity
if name:
self._attr_name = name
else:
self._attr_name = f"Dlib Face {split_entity_id(camera_entity)[1]}"
def process_image(self, image: bytes) -> None:
"""Process image."""
fak_file = io.BytesIO(image)
fak_file.name = "snapshot.jpg"
fak_file.seek(0)
image = face_recognition.load_image_file(fak_file)
face_locations = face_recognition.face_locations(image)
face_locations = [{ATTR_LOCATION: location} for location in face_locations]
self.process_faces(face_locations, len(face_locations))

View File

@@ -0,0 +1,10 @@
{
"domain": "dlib_face_detect",
"name": "Dlib Face Detect",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/dlib_face_detect",
"iot_class": "local_push",
"loggers": ["face_recognition"],
"quality_scale": "legacy",
"requirements": ["face-recognition==1.2.3"]
}

View File

@@ -0,0 +1,4 @@
"""The dlib_face_identify component."""
CONF_FACES = "faces"
DOMAIN = "dlib_face_identify"

View File

@@ -0,0 +1,127 @@
"""Component that will help set the Dlib face detect processing."""
from __future__ import annotations
import io
import logging
import face_recognition
import voluptuous as vol
from homeassistant.components.image_processing import (
CONF_CONFIDENCE,
PLATFORM_SCHEMA as IMAGE_PROCESSING_PLATFORM_SCHEMA,
FaceInformation,
ImageProcessingFaceEntity,
)
from homeassistant.const import ATTR_NAME, CONF_ENTITY_ID, CONF_NAME, CONF_SOURCE
from homeassistant.core import (
DOMAIN as HOMEASSISTANT_DOMAIN,
HomeAssistant,
split_entity_id,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import CONF_FACES, DOMAIN
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = IMAGE_PROCESSING_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_FACES): {cv.string: cv.isfile},
vol.Optional(CONF_CONFIDENCE, default=0.6): vol.Coerce(float),
}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Dlib Face detection platform."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Dlib Face Identify",
},
)
confidence: float = config[CONF_CONFIDENCE]
faces: dict[str, str] = config[CONF_FACES]
source: list[dict[str, str]] = config[CONF_SOURCE]
add_entities(
DlibFaceIdentifyEntity(
camera[CONF_ENTITY_ID],
faces,
camera.get(CONF_NAME),
confidence,
)
for camera in source
)
class DlibFaceIdentifyEntity(ImageProcessingFaceEntity):
"""Dlib Face API entity for identify."""
def __init__(
self,
camera_entity: str,
faces: dict[str, str],
name: str | None,
tolerance: float,
) -> None:
"""Initialize Dlib face identify entry."""
super().__init__()
self._attr_camera_entity = camera_entity
if name:
self._attr_name = name
else:
self._attr_name = f"Dlib Face {split_entity_id(camera_entity)[1]}"
self._faces = {}
for face_name, face_file in faces.items():
try:
image = face_recognition.load_image_file(face_file)
self._faces[face_name] = face_recognition.face_encodings(image)[0]
except IndexError as err:
_LOGGER.error("Failed to parse %s. Error: %s", face_file, err)
self._tolerance = tolerance
def process_image(self, image: bytes) -> None:
"""Process image."""
fak_file = io.BytesIO(image)
fak_file.name = "snapshot.jpg"
fak_file.seek(0)
image = face_recognition.load_image_file(fak_file)
unknowns = face_recognition.face_encodings(image)
found: list[FaceInformation] = []
for unknown_face in unknowns:
for name, face in self._faces.items():
result = face_recognition.compare_faces(
[face], unknown_face, tolerance=self._tolerance
)
if result[0]:
found.append({ATTR_NAME: name})
self.process_faces(found, len(unknowns))

View File

@@ -0,0 +1,10 @@
{
"domain": "dlib_face_identify",
"name": "Dlib Face Identify",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/dlib_face_identify",
"iot_class": "local_push",
"loggers": ["face_recognition"],
"quality_scale": "legacy",
"requirements": ["face-recognition==1.2.3"]
}

View File

@@ -89,9 +89,6 @@
}
},
"sensor": {
"auto_empty": {
"default": "mdi:delete-empty"
},
"error": {
"default": "mdi:alert-circle"
},
@@ -163,9 +160,6 @@
"advanced_mode": {
"default": "mdi:tune"
},
"border_spin": {
"default": "mdi:rotate-right"
},
"border_switch": {
"default": "mdi:land-fields"
},

View File

@@ -7,5 +7,5 @@
"integration_type": "hub",
"iot_class": "cloud_push",
"loggers": ["sleekxmppfs", "sucks", "deebot_client"],
"requirements": ["py-sucks==0.9.11", "deebot-client==16.1.0"]
"requirements": ["py-sucks==0.9.11", "deebot-client==16.3.0"]
}

View File

@@ -17,7 +17,6 @@ from deebot_client.events import (
NetworkInfoEvent,
StatsEvent,
TotalStatsEvent,
auto_empty,
station,
)
from sucks import VacBot
@@ -159,14 +158,6 @@ ENTITY_DESCRIPTIONS: tuple[EcovacsSensorEntityDescription, ...] = (
device_class=SensorDeviceClass.ENUM,
options=get_options(station.State),
),
EcovacsSensorEntityDescription[auto_empty.AutoEmptyEvent](
capability_fn=lambda caps: caps.station.auto_empty if caps.station else None,
value_fn=lambda e: get_name_key(e.frequency) if e.frequency else None,
key="auto_empty",
translation_key="auto_empty",
device_class=SensorDeviceClass.ENUM,
options=get_options(auto_empty.Frequency),
),
)

View File

@@ -149,13 +149,6 @@
}
},
"sensor": {
"auto_empty": {
"name": "Auto-empty frequency",
"state": {
"auto": "Auto",
"smart": "Smart"
}
},
"error": {
"name": "Error",
"state_attributes": {
@@ -238,9 +231,6 @@
"advanced_mode": {
"name": "Advanced mode"
},
"border_spin": {
"name": "Border spin"
},
"border_switch": {
"name": "Border switch"
},

View File

@@ -99,13 +99,6 @@ ENTITY_DESCRIPTIONS: tuple[EcovacsSwitchEntityDescription, ...] = (
entity_registry_enabled_default=False,
entity_category=EntityCategory.CONFIG,
),
EcovacsSwitchEntityDescription(
capability_fn=lambda c: c.settings.border_spin,
key="border_spin",
translation_key="border_spin",
entity_registry_enabled_default=False,
entity_category=EntityCategory.CONFIG,
),
)

View File

@@ -0,0 +1,6 @@
"""The eddystone_temperature component."""
DOMAIN = "eddystone_temperature"
CONF_BEACONS = "beacons"
CONF_INSTANCE = "instance"
CONF_NAMESPACE = "namespace"

View File

@@ -0,0 +1,10 @@
{
"domain": "eddystone_temperature",
"name": "Eddystone",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/eddystone_temperature",
"iot_class": "local_polling",
"loggers": ["beacontools"],
"quality_scale": "legacy",
"requirements": ["beacontools[scan]==2.1.0"]
}

View File

@@ -0,0 +1,211 @@
"""Read temperature information from Eddystone beacons.
Your beacons must be configured to transmit UID (for identification) and TLM
(for temperature) frames.
"""
from __future__ import annotations
import logging
from beacontools import BeaconScanner, EddystoneFilter, EddystoneTLMFrame
import voluptuous as vol
from homeassistant.components.sensor import (
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
SensorDeviceClass,
SensorEntity,
)
from homeassistant.const import (
CONF_NAME,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
STATE_UNKNOWN,
UnitOfTemperature,
)
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, Event, HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import CONF_BEACONS, CONF_INSTANCE, CONF_NAMESPACE, DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_BT_DEVICE_ID = "bt_device_id"
BEACON_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAMESPACE): cv.string,
vol.Required(CONF_INSTANCE): cv.string,
vol.Optional(CONF_NAME): cv.string,
}
)
PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_BT_DEVICE_ID, default=0): cv.positive_int,
vol.Required(CONF_BEACONS): vol.Schema({cv.string: BEACON_SCHEMA}),
}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Validate configuration, create devices and start monitoring thread."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Eddystone",
},
)
bt_device_id: int = config[CONF_BT_DEVICE_ID]
beacons: dict[str, dict[str, str]] = config[CONF_BEACONS]
devices: list[EddystoneTemp] = []
for dev_name, properties in beacons.items():
namespace = get_from_conf(properties, CONF_NAMESPACE, 20)
instance = get_from_conf(properties, CONF_INSTANCE, 12)
name = properties.get(CONF_NAME, dev_name)
if instance is None or namespace is None:
_LOGGER.error("Skipping %s", dev_name)
continue
devices.append(EddystoneTemp(name, namespace, instance))
if devices:
mon = Monitor(hass, devices, bt_device_id)
def monitor_stop(event: Event) -> None:
"""Stop the monitor thread."""
_LOGGER.debug("Stopping scanner for Eddystone beacons")
mon.stop()
def monitor_start(event: Event) -> None:
"""Start the monitor thread."""
_LOGGER.debug("Starting scanner for Eddystone beacons")
mon.start()
add_entities(devices)
mon.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, monitor_stop)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, monitor_start)
else:
_LOGGER.warning("No devices were added")
def get_from_conf(config: dict[str, str], config_key: str, length: int) -> str | None:
"""Retrieve value from config and validate length."""
string = config[config_key]
if len(string) != length:
_LOGGER.error(
(
"Error in configuration parameter %s: Must be exactly %d "
"bytes. Device will not be added"
),
config_key,
length / 2,
)
return None
return string
class EddystoneTemp(SensorEntity):
"""Representation of a temperature sensor."""
_attr_device_class = SensorDeviceClass.TEMPERATURE
_attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
_attr_should_poll = False
def __init__(self, name: str, namespace: str, instance: str) -> None:
"""Initialize a sensor."""
self._attr_name = name
self.namespace = namespace
self.instance = instance
self.bt_addr = None
self.temperature = STATE_UNKNOWN
@property
def native_value(self):
"""Return the state of the device."""
return self.temperature
class Monitor:
"""Continuously scan for BLE advertisements."""
def __init__(
self, hass: HomeAssistant, devices: list[EddystoneTemp], bt_device_id: int
) -> None:
"""Construct interface object."""
self.hass = hass
# List of beacons to monitor
self.devices = devices
# Number of the bt device (hciX)
self.bt_device_id = bt_device_id
def callback(bt_addr, _, packet, additional_info):
"""Handle new packets."""
self.process_packet(
additional_info["namespace"],
additional_info["instance"],
packet.temperature,
)
device_filters = [EddystoneFilter(d.namespace, d.instance) for d in devices]
self.scanner = BeaconScanner(
callback, bt_device_id, device_filters, EddystoneTLMFrame
)
self.scanning = False
def start(self) -> None:
"""Continuously scan for BLE advertisements."""
if not self.scanning:
self.scanner.start()
self.scanning = True
else:
_LOGGER.debug("start() called, but scanner is already running")
def process_packet(self, namespace, instance, temperature) -> None:
"""Assign temperature to device."""
_LOGGER.debug(
"Received temperature for <%s,%s>: %d", namespace, instance, temperature
)
for dev in self.devices:
if (
dev.namespace == namespace
and dev.instance == instance
and dev.temperature != temperature
):
dev.temperature = temperature
dev.schedule_update_ha_state()
def stop(self) -> None:
"""Signal runner to stop and join thread."""
if self.scanning:
_LOGGER.debug("Stopping")
self.scanner.stop()
_LOGGER.debug("Stopped")
self.scanning = False
else:
_LOGGER.debug("stop() called but scanner was not running")

View File

@@ -17,7 +17,7 @@
"mqtt": ["esphome/discover/#"],
"quality_scale": "platinum",
"requirements": [
"aioesphomeapi==42.5.0",
"aioesphomeapi==42.4.0",
"esphome-dashboard-api==1.3.0",
"bleak-esphome==3.4.0"
],

View File

@@ -0,0 +1,152 @@
"""The Flick Electric integration."""
from datetime import datetime as dt
import logging
from typing import Any
import jwt
from pyflick import FlickAPI
from pyflick.authentication import SimpleFlickAuth
from pyflick.const import DEFAULT_CLIENT_ID, DEFAULT_CLIENT_SECRET
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
CONF_PASSWORD,
CONF_USERNAME,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import aiohttp_client
from .const import CONF_ACCOUNT_ID, CONF_SUPPLY_NODE_REF, CONF_TOKEN_EXPIRY
from .coordinator import FlickConfigEntry, FlickElectricDataCoordinator
_LOGGER = logging.getLogger(__name__)
CONF_ID_TOKEN = "id_token"
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: FlickConfigEntry) -> bool:
"""Set up Flick Electric from a config entry."""
auth = HassFlickAuth(hass, entry)
coordinator = FlickElectricDataCoordinator(hass, entry, FlickAPI(auth))
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: FlickConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
async def async_migrate_entry(
hass: HomeAssistant, config_entry: FlickConfigEntry
) -> bool:
"""Migrate old entry."""
_LOGGER.debug(
"Migrating configuration from version %s.%s",
config_entry.version,
config_entry.minor_version,
)
if config_entry.version > 2:
return False
if config_entry.version == 1:
api = FlickAPI(HassFlickAuth(hass, config_entry))
accounts = await api.getCustomerAccounts()
active_accounts = [
account for account in accounts if account["status"] == "active"
]
# A single active account can be auto-migrated
if (len(active_accounts)) == 1:
account = active_accounts[0]
new_data = {**config_entry.data}
new_data[CONF_ACCOUNT_ID] = account["id"]
new_data[CONF_SUPPLY_NODE_REF] = account["main_consumer"]["supply_node_ref"]
hass.config_entries.async_update_entry(
config_entry,
title=account["address"],
unique_id=account["id"],
data=new_data,
version=2,
)
return True
config_entry.async_start_reauth(hass, data={**config_entry.data})
return False
return True
class HassFlickAuth(SimpleFlickAuth):
"""Implementation of AbstractFlickAuth based on a Home Assistant entity config."""
def __init__(self, hass: HomeAssistant, entry: FlickConfigEntry) -> None:
"""Flick authentication based on a Home Assistant entity config."""
super().__init__(
username=entry.data[CONF_USERNAME],
password=entry.data[CONF_PASSWORD],
client_id=entry.data.get(CONF_CLIENT_ID, DEFAULT_CLIENT_ID),
client_secret=entry.data.get(CONF_CLIENT_SECRET, DEFAULT_CLIENT_SECRET),
websession=aiohttp_client.async_get_clientsession(hass),
)
self._entry = entry
self._hass = hass
async def _get_entry_token(self) -> dict[str, Any]:
# No token saved, generate one
if (
CONF_TOKEN_EXPIRY not in self._entry.data
or CONF_ACCESS_TOKEN not in self._entry.data
):
await self._update_token()
# Token is expired, generate a new one
if self._entry.data[CONF_TOKEN_EXPIRY] <= dt.now().timestamp():
await self._update_token()
return self._entry.data[CONF_ACCESS_TOKEN]
async def _update_token(self):
_LOGGER.debug("Fetching new access token")
token = await super().get_new_token(
self._username, self._password, self._client_id, self._client_secret
)
_LOGGER.debug("New token: %s", token)
# Flick will send the same token, but expiry is relative - so grab it from the token
token_decoded = jwt.decode(
token[CONF_ID_TOKEN], options={"verify_signature": False}
)
self._hass.config_entries.async_update_entry(
self._entry,
data={
**self._entry.data,
CONF_ACCESS_TOKEN: token,
CONF_TOKEN_EXPIRY: token_decoded["exp"],
},
)
async def async_get_access_token(self):
"""Get Access Token from HASS Storage."""
token = await self._get_entry_token()
return token[CONF_ID_TOKEN]

View File

@@ -0,0 +1,210 @@
"""Config Flow for Flick Electric integration."""
import asyncio
from collections.abc import Mapping
import logging
from typing import Any
from aiohttp import ClientResponseError
from pyflick import FlickAPI
from pyflick.authentication import AbstractFlickAuth, SimpleFlickAuth
from pyflick.const import DEFAULT_CLIENT_ID, DEFAULT_CLIENT_SECRET
from pyflick.types import APIException, AuthException, CustomerAccount
import voluptuous as vol
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlow, ConfigFlowResult
from homeassistant.const import (
CONF_CLIENT_ID,
CONF_CLIENT_SECRET,
CONF_PASSWORD,
CONF_USERNAME,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from .const import CONF_ACCOUNT_ID, CONF_SUPPLY_NODE_REF, DOMAIN
_LOGGER = logging.getLogger(__name__)
LOGIN_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Optional(CONF_CLIENT_ID): str,
vol.Optional(CONF_CLIENT_SECRET): str,
}
)
class FlickConfigFlow(ConfigFlow, domain=DOMAIN):
"""Flick config flow."""
VERSION = 2
auth: AbstractFlickAuth
accounts: list[CustomerAccount]
data: dict[str, Any]
async def _validate_auth(self, user_input: Mapping[str, Any]) -> bool:
self.auth = SimpleFlickAuth(
username=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD],
websession=aiohttp_client.async_get_clientsession(self.hass),
client_id=user_input.get(CONF_CLIENT_ID, DEFAULT_CLIENT_ID),
client_secret=user_input.get(CONF_CLIENT_SECRET, DEFAULT_CLIENT_SECRET),
)
try:
async with asyncio.timeout(60):
token = await self.auth.async_get_access_token()
except (TimeoutError, ClientResponseError) as err:
raise CannotConnect from err
except AuthException as err:
raise InvalidAuth from err
return token is not None
async def async_step_select_account(
self, user_input: Mapping[str, Any] | None = None
) -> ConfigFlowResult:
"""Ask user to select account."""
errors = {}
if user_input is not None and CONF_ACCOUNT_ID in user_input:
self.data[CONF_ACCOUNT_ID] = user_input[CONF_ACCOUNT_ID]
self.data[CONF_SUPPLY_NODE_REF] = self._get_supply_node_ref(
user_input[CONF_ACCOUNT_ID]
)
try:
# Ensure supply node is active
await FlickAPI(self.auth).getPricing(self.data[CONF_SUPPLY_NODE_REF])
except (APIException, ClientResponseError):
errors["base"] = "cannot_connect"
except AuthException:
# We should never get here as we have a valid token
return self.async_abort(reason="no_permissions")
else:
# Supply node is active
return await self._async_create_entry()
try:
self.accounts = await FlickAPI(self.auth).getCustomerAccounts()
except (APIException, ClientResponseError):
errors["base"] = "cannot_connect"
active_accounts = [a for a in self.accounts if a["status"] == "active"]
if len(active_accounts) == 0:
return self.async_abort(reason="no_accounts")
if len(active_accounts) == 1:
self.data[CONF_ACCOUNT_ID] = active_accounts[0]["id"]
self.data[CONF_SUPPLY_NODE_REF] = self._get_supply_node_ref(
active_accounts[0]["id"]
)
return await self._async_create_entry()
return self.async_show_form(
step_id="select_account",
data_schema=vol.Schema(
{
vol.Required(CONF_ACCOUNT_ID): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(
value=account["id"], label=account["address"]
)
for account in active_accounts
],
mode=SelectSelectorMode.LIST,
)
)
}
),
errors=errors,
)
async def async_step_user(
self, user_input: Mapping[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle gathering login info."""
errors = {}
if user_input is not None:
try:
await self._validate_auth(user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
self.data = dict(user_input)
return await self.async_step_select_account(user_input)
return self.async_show_form(
step_id="user", data_schema=LOGIN_SCHEMA, errors=errors
)
async def async_step_reauth(
self, user_input: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle re-authentication."""
self.data = {**user_input}
return await self.async_step_user(user_input)
async def _async_create_entry(self) -> ConfigFlowResult:
"""Create an entry for the flow."""
await self.async_set_unique_id(self.data[CONF_ACCOUNT_ID])
account = self._get_account(self.data[CONF_ACCOUNT_ID])
if self.source == SOURCE_REAUTH:
# Migration completed
if self._get_reauth_entry().version == 1:
self.hass.config_entries.async_update_entry(
self._get_reauth_entry(),
unique_id=self.unique_id,
data=self.data,
version=self.VERSION,
)
return self.async_update_reload_and_abort(
self._get_reauth_entry(),
unique_id=self.unique_id,
title=account["address"],
data=self.data,
)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=account["address"],
data=self.data,
)
def _get_account(self, account_id: str) -> CustomerAccount:
"""Get the account for the account ID."""
return next(a for a in self.accounts if a["id"] == account_id)
def _get_supply_node_ref(self, account_id: str) -> str:
"""Get the supply node ref for the account."""
return self._get_account(account_id)["main_consumer"][CONF_SUPPLY_NODE_REF]
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect."""
class InvalidAuth(HomeAssistantError):
"""Error to indicate there is invalid auth."""

View File

@@ -0,0 +1,12 @@
"""Constants for the Flick Electric integration."""
DOMAIN = "flick_electric"
CONF_TOKEN_EXPIRY = "expires"
CONF_ACCOUNT_ID = "account_id"
CONF_SUPPLY_NODE_REF = "supply_node_ref"
ATTR_START_AT = "start_at"
ATTR_END_AT = "end_at"
ATTR_COMPONENTS = ["retailer", "ea", "metering", "generation", "admin", "network"]

View File

@@ -0,0 +1,55 @@
"""Data Coordinator for Flick Electric."""
import asyncio
from datetime import timedelta
import logging
import aiohttp
from pyflick import FlickAPI, FlickPrice
from pyflick.types import APIException, AuthException
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_SUPPLY_NODE_REF
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5)
type FlickConfigEntry = ConfigEntry[FlickElectricDataCoordinator]
class FlickElectricDataCoordinator(DataUpdateCoordinator[FlickPrice]):
"""Coordinator for flick power price."""
config_entry: FlickConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: FlickConfigEntry,
api: FlickAPI,
) -> None:
"""Initialize FlickElectricDataCoordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name="Flick Electric",
update_interval=SCAN_INTERVAL,
)
self.supply_node_ref = config_entry.data[CONF_SUPPLY_NODE_REF]
self._api = api
async def _async_update_data(self) -> FlickPrice:
"""Fetch pricing data from Flick Electric."""
try:
async with asyncio.timeout(60):
return await self._api.getPricing(self.supply_node_ref)
except AuthException as err:
raise ConfigEntryAuthFailed from err
except (APIException, aiohttp.ClientResponseError) as err:
raise UpdateFailed from err

View File

@@ -0,0 +1,11 @@
{
"domain": "flick_electric",
"name": "Flick Electric",
"codeowners": ["@ZephireNZ"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/flick_electric",
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["pyflick"],
"requirements": ["PyFlick==1.1.3"]
}

View File

@@ -0,0 +1,72 @@
"""Support for Flick Electric Pricing data."""
from datetime import timedelta
from decimal import Decimal
import logging
from typing import Any
from homeassistant.components.sensor import SensorEntity
from homeassistant.const import CURRENCY_CENT, UnitOfEnergy
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import ATTR_COMPONENTS, ATTR_END_AT, ATTR_START_AT
from .coordinator import FlickConfigEntry, FlickElectricDataCoordinator
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5)
async def async_setup_entry(
hass: HomeAssistant,
entry: FlickConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Flick Sensor Setup."""
coordinator = entry.runtime_data
async_add_entities([FlickPricingSensor(coordinator)])
class FlickPricingSensor(CoordinatorEntity[FlickElectricDataCoordinator], SensorEntity):
"""Entity object for Flick Electric sensor."""
_attr_attribution = "Data provided by Flick Electric"
_attr_native_unit_of_measurement = f"{CURRENCY_CENT}/{UnitOfEnergy.KILO_WATT_HOUR}"
_attr_has_entity_name = True
_attr_translation_key = "power_price"
def __init__(self, coordinator: FlickElectricDataCoordinator) -> None:
"""Entity object for Flick Electric sensor."""
super().__init__(coordinator)
self._attr_unique_id = f"{coordinator.supply_node_ref}_pricing"
@property
def native_value(self) -> Decimal:
"""Return the state of the sensor."""
# The API should return a unit price with quantity of 1.0 when no start/end time is provided
if self.coordinator.data.quantity != 1:
_LOGGER.warning(
"Unexpected quantity for unit price: %s", self.coordinator.data
)
return self.coordinator.data.cost * 100
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return the state attributes."""
components: dict[str, float] = {}
for component in self.coordinator.data.components:
if component.charge_setter not in ATTR_COMPONENTS:
_LOGGER.warning("Found unknown component: %s", component.charge_setter)
continue
components[component.charge_setter] = float(component.value * 100)
return {
ATTR_START_AT: self.coordinator.data.start_at,
ATTR_END_AT: self.coordinator.data.end_at,
**components,
}

View File

@@ -0,0 +1,39 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
"no_accounts": "No services are active on this Flick account",
"no_permissions": "Cannot get pricing for this account. Please check user permissions.",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"select_account": {
"data": {
"account_id": "Account"
},
"title": "Select account"
},
"user": {
"data": {
"client_id": "Client ID (optional)",
"client_secret": "Client Secret (optional)",
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"title": "Flick Login Credentials"
}
}
},
"entity": {
"sensor": {
"power_price": {
"name": "Flick power price"
}
}
}
}

View File

@@ -21,9 +21,6 @@ from .coordinator import FritzboxConfigEntry
from .entity import FritzBoxDeviceEntity
from .model import FritzEntityDescriptionMixinBase
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class FritzBinarySensorEntityDescription(

View File

@@ -11,9 +11,6 @@ from .const import DOMAIN
from .coordinator import FritzboxConfigEntry
from .entity import FritzBoxEntity
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,

View File

@@ -23,9 +23,6 @@ from .coordinator import FritzboxConfigEntry, FritzboxDataUpdateCoordinator
from .entity import FritzBoxDeviceEntity
from .sensor import value_scheduled_preset
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
HVAC_MODES = [HVACMode.HEAT, HVACMode.OFF]
PRESET_HOLIDAY = "holiday"
PRESET_SUMMER = "summer"

View File

@@ -16,9 +16,6 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import FritzboxConfigEntry
from .entity import FritzBoxDeviceEntity
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,

View File

@@ -18,9 +18,6 @@ from .const import COLOR_MODE, LOGGER
from .coordinator import FritzboxConfigEntry, FritzboxDataUpdateCoordinator
from .entity import FritzBoxDeviceEntity
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,

View File

@@ -34,9 +34,6 @@ from .coordinator import FritzboxConfigEntry
from .entity import FritzBoxDeviceEntity
from .model import FritzEntityDescriptionMixinBase
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class FritzSensorEntityDescription(

View File

@@ -13,9 +13,6 @@ from .const import DOMAIN
from .coordinator import FritzboxConfigEntry
from .entity import FritzBoxDeviceEntity
# Coordinator handles data updates, so we can allow unlimited parallel updates
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,

View File

@@ -20,5 +20,5 @@
"documentation": "https://www.home-assistant.io/integrations/frontend",
"integration_type": "system",
"quality_scale": "internal",
"requirements": ["home-assistant-frontend==20251029.1"]
"requirements": ["home-assistant-frontend==20251103.0"]
}

View File

@@ -0,0 +1,3 @@
"""The gstreamer component."""
DOMAIN = "gstreamer"

View File

@@ -0,0 +1,10 @@
{
"domain": "gstreamer",
"name": "GStreamer",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/gstreamer",
"iot_class": "local_push",
"loggers": ["gsp"],
"quality_scale": "legacy",
"requirements": ["gstreamer-player==1.1.2"]
}

View File

@@ -0,0 +1,195 @@
"""Play media via gstreamer."""
from __future__ import annotations
import logging
from typing import Any
from gsp import STATE_IDLE, STATE_PAUSED, STATE_PLAYING, GstreamerPlayer
import voluptuous as vol
from homeassistant.components import media_source
from homeassistant.components.media_player import (
PLATFORM_SCHEMA as MEDIA_PLAYER_PLATFORM_SCHEMA,
BrowseMedia,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
async_process_play_media_url,
)
from homeassistant.const import CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_PIPELINE = "pipeline"
PLATFORM_SCHEMA = MEDIA_PLAYER_PLATFORM_SCHEMA.extend(
{vol.Optional(CONF_NAME): cv.string, vol.Optional(CONF_PIPELINE): cv.string}
)
GSP_STATE_MAPPING = {
STATE_IDLE: MediaPlayerState.IDLE,
STATE_PAUSED: MediaPlayerState.PAUSED,
STATE_PLAYING: MediaPlayerState.PLAYING,
}
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Gstreamer platform."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "GStreamer",
},
)
name = config.get(CONF_NAME)
pipeline = config.get(CONF_PIPELINE)
player = GstreamerPlayer(pipeline)
def _shutdown(call):
"""Quit the player on shutdown."""
player.quit()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown)
add_entities([GstreamerDevice(player, name)])
class GstreamerDevice(MediaPlayerEntity):
"""Representation of a Gstreamer device."""
_attr_media_content_type = MediaType.MUSIC
_attr_supported_features = (
MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.BROWSE_MEDIA
)
def __init__(self, player: GstreamerPlayer, name: str | None) -> None:
"""Initialize the Gstreamer device."""
self._player = player
self._name = name or DOMAIN
self._attr_state = MediaPlayerState.IDLE
self._volume = None
self._duration = None
self._uri = None
self._title = None
self._artist = None
self._album = None
def update(self) -> None:
"""Update properties."""
self._attr_state = GSP_STATE_MAPPING.get(self._player.state)
self._volume = self._player.volume
self._duration = self._player.duration
self._uri = self._player.uri
self._title = self._player.title
self._album = self._player.album
self._artist = self._player.artist
def set_volume_level(self, volume: float) -> None:
"""Set the volume level."""
self._player.volume = volume
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play media."""
# Handle media_source
if media_source.is_media_source_id(media_id):
sourced_media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = sourced_media.url
elif media_type != MediaType.MUSIC:
_LOGGER.error("Invalid media type")
return
media_id = async_process_play_media_url(self.hass, media_id)
await self.hass.async_add_executor_job(self._player.queue, media_id)
def media_play(self) -> None:
"""Play."""
self._player.play()
def media_pause(self) -> None:
"""Pause."""
self._player.pause()
def media_next_track(self) -> None:
"""Next track."""
self._player.next()
@property
def media_content_id(self):
"""Content ID of currently playing media."""
return self._uri
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def volume_level(self):
"""Return the volume level."""
return self._volume
@property
def media_duration(self):
"""Duration of current playing media in seconds."""
return self._duration
@property
def media_title(self):
"""Media title."""
return self._title
@property
def media_artist(self):
"""Media artist."""
return self._artist
@property
def media_album_name(self):
"""Media album."""
return self._album
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)

View File

@@ -44,7 +44,6 @@ from .const import (
EVENT_SUPPORTED_CHANGED,
EXTRA_PLACEHOLDERS,
ISSUE_KEY_ADDON_BOOT_FAIL,
ISSUE_KEY_ADDON_DEPRECATED,
ISSUE_KEY_ADDON_DETACHED_ADDON_MISSING,
ISSUE_KEY_ADDON_DETACHED_ADDON_REMOVED,
ISSUE_KEY_ADDON_PWNED,
@@ -87,7 +86,6 @@ ISSUE_KEYS_FOR_REPAIRS = {
"issue_system_disk_lifetime",
ISSUE_KEY_SYSTEM_FREE_SPACE,
ISSUE_KEY_ADDON_PWNED,
ISSUE_KEY_ADDON_DEPRECATED,
}
_LOGGER = logging.getLogger(__name__)

View File

@@ -0,0 +1,87 @@
"""Support to emulate keyboard presses on host machine."""
from pykeyboard import PyKeyboard
import voluptuous as vol
from homeassistant.const import (
SERVICE_MEDIA_NEXT_TRACK,
SERVICE_MEDIA_PLAY_PAUSE,
SERVICE_MEDIA_PREVIOUS_TRACK,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_MUTE,
SERVICE_VOLUME_UP,
)
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType
DOMAIN = "keyboard"
TAP_KEY_SCHEMA = vol.Schema({})
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Listen for keyboard events."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Keyboard",
},
)
keyboard = PyKeyboard()
keyboard.special_key_assignment()
hass.services.register(
DOMAIN,
SERVICE_VOLUME_UP,
lambda service: keyboard.tap_key(keyboard.volume_up_key),
schema=TAP_KEY_SCHEMA,
)
hass.services.register(
DOMAIN,
SERVICE_VOLUME_DOWN,
lambda service: keyboard.tap_key(keyboard.volume_down_key),
schema=TAP_KEY_SCHEMA,
)
hass.services.register(
DOMAIN,
SERVICE_VOLUME_MUTE,
lambda service: keyboard.tap_key(keyboard.volume_mute_key),
schema=TAP_KEY_SCHEMA,
)
hass.services.register(
DOMAIN,
SERVICE_MEDIA_PLAY_PAUSE,
lambda service: keyboard.tap_key(keyboard.media_play_pause_key),
schema=TAP_KEY_SCHEMA,
)
hass.services.register(
DOMAIN,
SERVICE_MEDIA_NEXT_TRACK,
lambda service: keyboard.tap_key(keyboard.media_next_track_key),
schema=TAP_KEY_SCHEMA,
)
hass.services.register(
DOMAIN,
SERVICE_MEDIA_PREVIOUS_TRACK,
lambda service: keyboard.tap_key(keyboard.media_prev_track_key),
schema=TAP_KEY_SCHEMA,
)
return True

View File

@@ -0,0 +1,22 @@
{
"services": {
"media_next_track": {
"service": "mdi:skip-next"
},
"media_play_pause": {
"service": "mdi:play-pause"
},
"media_prev_track": {
"service": "mdi:skip-previous"
},
"volume_down": {
"service": "mdi:volume-low"
},
"volume_mute": {
"service": "mdi:volume-off"
},
"volume_up": {
"service": "mdi:volume-high"
}
}
}

View File

@@ -0,0 +1,10 @@
{
"domain": "keyboard",
"name": "Keyboard",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/keyboard",
"iot_class": "local_push",
"loggers": ["pykeyboard"],
"quality_scale": "legacy",
"requirements": ["pyuserinput==0.1.11"]
}

View File

@@ -0,0 +1,6 @@
volume_up:
volume_down:
volume_mute:
media_play_pause:
media_next_track:
media_prev_track:

View File

@@ -0,0 +1,28 @@
{
"services": {
"media_next_track": {
"description": "Simulates a key press of the \"Media Next Track\" button on Home Assistant's host machine.",
"name": "Media next track"
},
"media_play_pause": {
"description": "Simulates a key press of the \"Media Play/Pause\" button on Home Assistant's host machine.",
"name": "Media play/pause"
},
"media_prev_track": {
"description": "Simulates a key press of the \"Media Previous Track\" button on Home Assistant's host machine.",
"name": "Media previous track"
},
"volume_down": {
"description": "Simulates a key press of the \"Volume Down\" button on Home Assistant's host machine.",
"name": "Volume down"
},
"volume_mute": {
"description": "Simulates a key press of the \"Volume Mute\" button on Home Assistant's host machine.",
"name": "Volume mute"
},
"volume_up": {
"description": "Simulates a key press of the \"Volume Up\" button on Home Assistant's host machine.",
"name": "Volume up"
}
}
}

View File

@@ -299,8 +299,8 @@ def _create_climate_ui(xknx: XKNX, conf: ConfigExtractor, name: str) -> XknxClim
group_address_active_state=conf.get_state_and_passive(CONF_GA_ACTIVE),
group_address_command_value_state=conf.get_state_and_passive(CONF_GA_VALVE),
sync_state=sync_state,
min_temp=conf.get(ClimateConf.MIN_TEMP),
max_temp=conf.get(ClimateConf.MAX_TEMP),
min_temp=conf.get(CONF_TARGET_TEMPERATURE, ClimateConf.MIN_TEMP),
max_temp=conf.get(CONF_TARGET_TEMPERATURE, ClimateConf.MAX_TEMP),
mode=climate_mode,
group_address_fan_speed=conf.get_write(CONF_GA_FAN_SPEED),
group_address_fan_speed_state=conf.get_state_and_passive(CONF_GA_FAN_SPEED),
@@ -486,7 +486,7 @@ class _KnxClimate(ClimateEntity, _KnxEntityBase):
ha_controller_modes.append(self._last_hvac_mode)
ha_controller_modes.append(HVACMode.OFF)
hvac_modes = list(set(filter(None, ha_controller_modes)))
hvac_modes = sorted(set(filter(None, ha_controller_modes)))
return (
hvac_modes
if hvac_modes

View File

@@ -13,7 +13,7 @@
"requirements": [
"xknx==3.10.0",
"xknxproject==3.8.2",
"knx-frontend==2025.10.26.81530"
"knx-frontend==2025.10.31.195356"
],
"single_config_entry": true
}

View File

@@ -0,0 +1,94 @@
"""Support for LIRC devices."""
import logging
import threading
import time
import lirc
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
BUTTON_NAME = "button_name"
DOMAIN = "lirc"
EVENT_IR_COMMAND_RECEIVED = "ir_command_received"
ICON = "mdi:remote"
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the LIRC capability."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "LIRC",
},
)
# blocking=True gives unexpected behavior (multiple responses for 1 press)
# also by not blocking, we allow hass to shut down the thread gracefully
# on exit.
lirc.init("home-assistant", blocking=False)
lirc_interface = LircInterface(hass)
def _start_lirc(_event):
lirc_interface.start()
def _stop_lirc(_event):
lirc_interface.stopped.set()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_lirc)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _stop_lirc)
return True
class LircInterface(threading.Thread):
"""Interfaces with the lirc daemon to read IR commands.
When using lirc in blocking mode, sometimes repeated commands get produced
in the next read of a command so we use a thread here to just wait
around until a non-empty response is obtained from lirc.
"""
def __init__(self, hass):
"""Construct a LIRC interface object."""
threading.Thread.__init__(self)
self.daemon = True
self.stopped = threading.Event()
self.hass = hass
def run(self):
"""Run the loop of the LIRC interface thread."""
_LOGGER.debug("LIRC interface thread started")
while not self.stopped.is_set():
try:
code = lirc.nextcode() # list; empty if no buttons pressed
except lirc.NextCodeError:
_LOGGER.warning("Error reading next code from LIRC")
code = None
# interpret result from python-lirc
if code:
code = code[0]
_LOGGER.debug("Got new LIRC code %s", code)
self.hass.bus.fire(EVENT_IR_COMMAND_RECEIVED, {BUTTON_NAME: code})
else:
time.sleep(0.2)
lirc.deinit()
_LOGGER.debug("LIRC interface thread stopped")

View File

@@ -0,0 +1,10 @@
{
"domain": "lirc",
"name": "LIRC",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/lirc",
"iot_class": "local_push",
"loggers": ["lirc"],
"quality_scale": "legacy",
"requirements": ["python-lirc==1.2.3"]
}

View File

@@ -0,0 +1,76 @@
"""Support for Neato botvac connected vacuum cleaners."""
import logging
import aiohttp
from pybotvac import Account
from pybotvac.exceptions import NeatoException
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_TOKEN, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers import config_entry_oauth2_flow
from . import api
from .const import NEATO_DOMAIN, NEATO_LOGIN
from .hub import NeatoHub
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [
Platform.BUTTON,
Platform.CAMERA,
Platform.SENSOR,
Platform.SWITCH,
Platform.VACUUM,
]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up config entry."""
hass.data.setdefault(NEATO_DOMAIN, {})
if CONF_TOKEN not in entry.data:
raise ConfigEntryAuthFailed
implementation = (
await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
)
session = config_entry_oauth2_flow.OAuth2Session(hass, entry, implementation)
try:
await session.async_ensure_token_valid()
except aiohttp.ClientResponseError as ex:
_LOGGER.debug("API error: %s (%s)", ex.code, ex.message)
if ex.code in (401, 403):
raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from ex
raise ConfigEntryNotReady from ex
neato_session = api.ConfigEntryAuth(hass, entry, implementation)
hass.data[NEATO_DOMAIN][entry.entry_id] = neato_session
hub = NeatoHub(hass, Account(neato_session))
await hub.async_update_entry_unique_id(entry)
try:
await hass.async_add_executor_job(hub.update_robots)
except NeatoException as ex:
_LOGGER.debug("Failed to connect to Neato API")
raise ConfigEntryNotReady from ex
hass.data[NEATO_LOGIN] = hub
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[NEATO_DOMAIN].pop(entry.entry_id)
return unload_ok

View File

@@ -0,0 +1,58 @@
"""API for Neato Botvac bound to Home Assistant OAuth."""
from __future__ import annotations
from asyncio import run_coroutine_threadsafe
from typing import Any
import pybotvac
from homeassistant import config_entries, core
from homeassistant.components.application_credentials import AuthImplementation
from homeassistant.helpers import config_entry_oauth2_flow
class ConfigEntryAuth(pybotvac.OAuthSession): # type: ignore[misc]
"""Provide Neato Botvac authentication tied to an OAuth2 based config entry."""
def __init__(
self,
hass: core.HomeAssistant,
config_entry: config_entries.ConfigEntry,
implementation: config_entry_oauth2_flow.AbstractOAuth2Implementation,
) -> None:
"""Initialize Neato Botvac Auth."""
self.hass = hass
self.session = config_entry_oauth2_flow.OAuth2Session(
hass, config_entry, implementation
)
super().__init__(self.session.token, vendor=pybotvac.Neato())
def refresh_tokens(self) -> str:
"""Refresh and return new Neato Botvac tokens."""
run_coroutine_threadsafe(
self.session.async_ensure_token_valid(), self.hass.loop
).result()
return self.session.token["access_token"] # type: ignore[no-any-return]
class NeatoImplementation(AuthImplementation):
"""Neato implementation of LocalOAuth2Implementation.
We need this class because we have to add client_secret
and scope to the authorization request.
"""
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {"client_secret": self.client_secret}
async def async_generate_authorize_url(self, flow_id: str) -> str:
"""Generate a url for the user to authorize.
We must make sure that the plus signs are not encoded.
"""
url = await super().async_generate_authorize_url(flow_id)
return f"{url}&scope=public_profile+control_robots+maps"

View File

@@ -0,0 +1,28 @@
"""Application credentials platform for neato."""
from pybotvac import Neato
from homeassistant.components.application_credentials import (
AuthorizationServer,
ClientCredential,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow
from . import api
async def async_get_auth_implementation(
hass: HomeAssistant, auth_domain: str, credential: ClientCredential
) -> config_entry_oauth2_flow.AbstractOAuth2Implementation:
"""Return auth implementation for a custom auth implementation."""
vendor = Neato()
return api.NeatoImplementation(
hass,
auth_domain,
credential,
AuthorizationServer(
authorize_url=vendor.auth_endpoint,
token_url=vendor.token_endpoint,
),
)

View File

@@ -0,0 +1,44 @@
"""Support for Neato buttons."""
from __future__ import annotations
from pybotvac import Robot
from homeassistant.components.button import ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import NEATO_ROBOTS
from .entity import NeatoEntity
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Neato button from config entry."""
entities = [NeatoDismissAlertButton(robot) for robot in hass.data[NEATO_ROBOTS]]
async_add_entities(entities, True)
class NeatoDismissAlertButton(NeatoEntity, ButtonEntity):
"""Representation of a dismiss_alert button entity."""
_attr_translation_key = "dismiss_alert"
_attr_entity_category = EntityCategory.CONFIG
def __init__(
self,
robot: Robot,
) -> None:
"""Initialize a dismiss_alert Neato button entity."""
super().__init__(robot)
self._attr_unique_id = f"{robot.serial}_dismiss_alert"
async def async_press(self) -> None:
"""Press the button."""
await self.hass.async_add_executor_job(self.robot.dismiss_current_alert)

View File

@@ -0,0 +1,130 @@
"""Support for loading picture from Neato."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from pybotvac.exceptions import NeatoRobotException
from pybotvac.robot import Robot
from urllib3.response import HTTPResponse
from homeassistant.components.camera import Camera
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import NEATO_LOGIN, NEATO_MAP_DATA, NEATO_ROBOTS, SCAN_INTERVAL_MINUTES
from .entity import NeatoEntity
from .hub import NeatoHub
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=SCAN_INTERVAL_MINUTES)
ATTR_GENERATED_AT = "generated_at"
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Neato camera with config entry."""
neato: NeatoHub = hass.data[NEATO_LOGIN]
mapdata: dict[str, Any] | None = hass.data.get(NEATO_MAP_DATA)
dev = [
NeatoCleaningMap(neato, robot, mapdata)
for robot in hass.data[NEATO_ROBOTS]
if "maps" in robot.traits
]
if not dev:
return
_LOGGER.debug("Adding robots for cleaning maps %s", dev)
async_add_entities(dev, True)
class NeatoCleaningMap(NeatoEntity, Camera):
"""Neato cleaning map for last clean."""
_attr_translation_key = "cleaning_map"
def __init__(
self, neato: NeatoHub, robot: Robot, mapdata: dict[str, Any] | None
) -> None:
"""Initialize Neato cleaning map."""
super().__init__(robot)
Camera.__init__(self)
self.neato = neato
self._mapdata = mapdata
self._available = neato is not None
self._robot_serial: str = self.robot.serial
self._attr_unique_id = self.robot.serial
self._generated_at: str | None = None
self._image_url: str | None = None
self._image: bytes | None = None
def camera_image(
self, width: int | None = None, height: int | None = None
) -> bytes | None:
"""Return image response."""
self.update()
return self._image
def update(self) -> None:
"""Check the contents of the map list."""
_LOGGER.debug("Running camera update for '%s'", self.entity_id)
try:
self.neato.update_robots()
except NeatoRobotException as ex:
if self._available: # Print only once when available
_LOGGER.error(
"Neato camera connection error for '%s': %s", self.entity_id, ex
)
self._image = None
self._image_url = None
self._available = False
return
if self._mapdata:
map_data: dict[str, Any] = self._mapdata[self._robot_serial]["maps"][0]
if (image_url := map_data["url"]) == self._image_url:
_LOGGER.debug(
"The map image_url for '%s' is the same as old", self.entity_id
)
return
try:
image: HTTPResponse = self.neato.download_map(image_url)
except NeatoRobotException as ex:
if self._available: # Print only once when available
_LOGGER.error(
"Neato camera connection error for '%s': %s", self.entity_id, ex
)
self._image = None
self._image_url = None
self._available = False
return
self._image = image.read()
self._image_url = image_url
self._generated_at = map_data.get("generated_at")
self._available = True
@property
def available(self) -> bool:
"""Return if the robot is available."""
return self._available
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the vacuum cleaner."""
data: dict[str, Any] = {}
if self._generated_at is not None:
data[ATTR_GENERATED_AT] = self._generated_at
return data

View File

@@ -0,0 +1,64 @@
"""Config flow for Neato Botvac."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
from homeassistant.helpers import config_entry_oauth2_flow
from .const import NEATO_DOMAIN
class OAuth2FlowHandler(
config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=NEATO_DOMAIN
):
"""Config flow to handle Neato Botvac OAuth2 authentication."""
DOMAIN = NEATO_DOMAIN
@property
def logger(self) -> logging.Logger:
"""Return logger."""
return logging.getLogger(__name__)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Create an entry for the flow."""
current_entries = self._async_current_entries()
if self.source != SOURCE_REAUTH and current_entries:
# Already configured
return self.async_abort(reason="already_configured")
return await super().async_step_user(user_input=user_input)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Perform reauth upon migration of old entries."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauth upon migration of old entries."""
if user_input is None:
return self.async_show_form(step_id="reauth_confirm")
return await self.async_step_user()
async def async_oauth_create_entry(self, data: dict[str, Any]) -> ConfigFlowResult:
"""Create an entry for the flow. Update an entry if one already exist."""
current_entries = self._async_current_entries()
if self.source == SOURCE_REAUTH and current_entries:
# Update entry
self.hass.config_entries.async_update_entry(
current_entries[0], title=self.flow_impl.name, data=data
)
self.hass.async_create_task(
self.hass.config_entries.async_reload(current_entries[0].entry_id)
)
return self.async_abort(reason="reauth_successful")
return self.async_create_entry(title=self.flow_impl.name, data=data)

View File

@@ -0,0 +1,150 @@
"""Constants for Neato integration."""
NEATO_DOMAIN = "neato"
CONF_VENDOR = "vendor"
NEATO_LOGIN = "neato_login"
NEATO_MAP_DATA = "neato_map_data"
NEATO_PERSISTENT_MAPS = "neato_persistent_maps"
NEATO_ROBOTS = "neato_robots"
SCAN_INTERVAL_MINUTES = 1
MODE = {1: "Eco", 2: "Turbo"}
ACTION = {
0: "Invalid",
1: "House Cleaning",
2: "Spot Cleaning",
3: "Manual Cleaning",
4: "Docking",
5: "User Menu Active",
6: "Suspended Cleaning",
7: "Updating",
8: "Copying logs",
9: "Recovering Location",
10: "IEC test",
11: "Map cleaning",
12: "Exploring map (creating a persistent map)",
13: "Acquiring Persistent Map IDs",
14: "Creating & Uploading Map",
15: "Suspended Exploration",
}
ERRORS = {
"ui_error_battery_battundervoltlithiumsafety": "Replace battery",
"ui_error_battery_critical": "Replace battery",
"ui_error_battery_invalidsensor": "Replace battery",
"ui_error_battery_lithiumadapterfailure": "Replace battery",
"ui_error_battery_mismatch": "Replace battery",
"ui_error_battery_nothermistor": "Replace battery",
"ui_error_battery_overtemp": "Replace battery",
"ui_error_battery_overvolt": "Replace battery",
"ui_error_battery_undercurrent": "Replace battery",
"ui_error_battery_undertemp": "Replace battery",
"ui_error_battery_undervolt": "Replace battery",
"ui_error_battery_unplugged": "Replace battery",
"ui_error_brush_stuck": "Brush stuck",
"ui_error_brush_overloaded": "Brush overloaded",
"ui_error_bumper_stuck": "Bumper stuck",
"ui_error_check_battery_switch": "Check battery",
"ui_error_corrupt_scb": "Call customer service corrupt board",
"ui_error_deck_debris": "Deck debris",
"ui_error_dflt_app": "Check Neato app",
"ui_error_disconnect_chrg_cable": "Disconnected charge cable",
"ui_error_disconnect_usb_cable": "Disconnected USB cable",
"ui_error_dust_bin_missing": "Dust bin missing",
"ui_error_dust_bin_full": "Dust bin full",
"ui_error_dust_bin_emptied": "Dust bin emptied",
"ui_error_hardware_failure": "Hardware failure",
"ui_error_ldrop_stuck": "Clear my path",
"ui_error_lds_jammed": "Clear my path",
"ui_error_lds_bad_packets": "Check Neato app",
"ui_error_lds_disconnected": "Check Neato app",
"ui_error_lds_missed_packets": "Check Neato app",
"ui_error_lwheel_stuck": "Clear my path",
"ui_error_navigation_backdrop_frontbump": "Clear my path",
"ui_error_navigation_backdrop_leftbump": "Clear my path",
"ui_error_navigation_backdrop_wheelextended": "Clear my path",
"ui_error_navigation_noprogress": "Clear my path",
"ui_error_navigation_origin_unclean": "Clear my path",
"ui_error_navigation_pathproblems": "Cannot return to base",
"ui_error_navigation_pinkycommsfail": "Clear my path",
"ui_error_navigation_falling": "Clear my path",
"ui_error_navigation_noexitstogo": "Clear my path",
"ui_error_navigation_nomotioncommands": "Clear my path",
"ui_error_navigation_rightdrop_leftbump": "Clear my path",
"ui_error_navigation_undockingfailed": "Clear my path",
"ui_error_picked_up": "Picked up",
"ui_error_qa_fail": "Check Neato app",
"ui_error_rdrop_stuck": "Clear my path",
"ui_error_reconnect_failed": "Reconnect failed",
"ui_error_rwheel_stuck": "Clear my path",
"ui_error_stuck": "Stuck!",
"ui_error_unable_to_return_to_base": "Unable to return to base",
"ui_error_unable_to_see": "Clean vacuum sensors",
"ui_error_vacuum_slip": "Clear my path",
"ui_error_vacuum_stuck": "Clear my path",
"ui_error_warning": "Error check app",
"batt_base_connect_fail": "Battery failed to connect to base",
"batt_base_no_power": "Battery base has no power",
"batt_low": "Battery low",
"batt_on_base": "Battery on base",
"clean_tilt_on_start": "Clean the tilt on start",
"dustbin_full": "Dust bin full",
"dustbin_missing": "Dust bin missing",
"gen_picked_up": "Picked up",
"hw_fail": "Hardware failure",
"hw_tof_sensor_sensor": "Hardware sensor disconnected",
"lds_bad_packets": "Bad packets",
"lds_deck_debris": "Debris on deck",
"lds_disconnected": "Disconnected",
"lds_jammed": "Jammed",
"lds_missed_packets": "Missed packets",
"maint_brush_stuck": "Brush stuck",
"maint_brush_overload": "Brush overloaded",
"maint_bumper_stuck": "Bumper stuck",
"maint_customer_support_qa": "Contact customer support",
"maint_vacuum_stuck": "Vacuum is stuck",
"maint_vacuum_slip": "Vacuum is stuck",
"maint_left_drop_stuck": "Vacuum is stuck",
"maint_left_wheel_stuck": "Vacuum is stuck",
"maint_right_drop_stuck": "Vacuum is stuck",
"maint_right_wheel_stuck": "Vacuum is stuck",
"not_on_charge_base": "Not on the charge base",
"nav_robot_falling": "Clear my path",
"nav_no_path": "Clear my path",
"nav_path_problem": "Clear my path",
"nav_backdrop_frontbump": "Clear my path",
"nav_backdrop_leftbump": "Clear my path",
"nav_backdrop_wheelextended": "Clear my path",
"nav_floorplan_zone_path_blocked": "Clear my path",
"nav_mag_sensor": "Clear my path",
"nav_no_exit": "Clear my path",
"nav_no_movement": "Clear my path",
"nav_rightdrop_leftbump": "Clear my path",
"nav_undocking_failed": "Clear my path",
}
ALERTS = {
"ui_alert_dust_bin_full": "Please empty dust bin",
"ui_alert_recovering_location": "Returning to start",
"ui_alert_battery_chargebasecommerr": "Battery error",
"ui_alert_busy_charging": "Busy charging",
"ui_alert_charging_base": "Base charging",
"ui_alert_charging_power": "Charging power",
"ui_alert_connect_chrg_cable": "Connect charge cable",
"ui_alert_info_thank_you": "Thank you",
"ui_alert_invalid": "Invalid check app",
"ui_alert_old_error": "Old error",
"ui_alert_swupdate_fail": "Update failed",
"dustbin_full": "Please empty dust bin",
"maint_brush_change": "Change the brush",
"maint_filter_change": "Change the filter",
"clean_completed_to_start": "Cleaning completed",
"nav_floorplan_not_created": "No floorplan found",
"nav_floorplan_load_fail": "Failed to load floorplan",
"nav_floorplan_localization_fail": "Failed to load floorplan",
"clean_incomplete_to_start": "Cleaning incomplete",
"log_upload_failed": "Logs failed to upload",
}

View File

@@ -0,0 +1,24 @@
"""Base entity for Neato."""
from __future__ import annotations
from pybotvac import Robot
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity
from .const import NEATO_DOMAIN
class NeatoEntity(Entity):
"""Base Neato entity."""
_attr_has_entity_name = True
def __init__(self, robot: Robot) -> None:
"""Initialize Neato entity."""
self.robot = robot
self._attr_device_info: DeviceInfo = DeviceInfo(
identifiers={(NEATO_DOMAIN, self.robot.serial)},
name=self.robot.name,
)

View File

@@ -0,0 +1,50 @@
"""Support for Neato botvac connected vacuum cleaners."""
from datetime import timedelta
import logging
from pybotvac import Account
from urllib3.response import HTTPResponse
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.util import Throttle
from .const import NEATO_MAP_DATA, NEATO_PERSISTENT_MAPS, NEATO_ROBOTS
_LOGGER = logging.getLogger(__name__)
class NeatoHub:
"""A My Neato hub wrapper class."""
def __init__(self, hass: HomeAssistant, neato: Account) -> None:
"""Initialize the Neato hub."""
self._hass = hass
self.my_neato: Account = neato
@Throttle(timedelta(minutes=1))
def update_robots(self) -> None:
"""Update the robot states."""
_LOGGER.debug("Running HUB.update_robots %s", self._hass.data.get(NEATO_ROBOTS))
self._hass.data[NEATO_ROBOTS] = self.my_neato.robots
self._hass.data[NEATO_PERSISTENT_MAPS] = self.my_neato.persistent_maps
self._hass.data[NEATO_MAP_DATA] = self.my_neato.maps
def download_map(self, url: str) -> HTTPResponse:
"""Download a new map image."""
map_image_data: HTTPResponse = self.my_neato.get_map_image(url)
return map_image_data
async def async_update_entry_unique_id(self, entry: ConfigEntry) -> str:
"""Update entry for unique_id."""
await self._hass.async_add_executor_job(self.my_neato.refresh_userdata)
unique_id: str = self.my_neato.unique_id
if entry.unique_id == unique_id:
return unique_id
_LOGGER.debug("Updating user unique_id for previous config entry")
self._hass.config_entries.async_update_entry(entry, unique_id=unique_id)
return unique_id

View File

@@ -0,0 +1,7 @@
{
"services": {
"custom_cleaning": {
"service": "mdi:broom"
}
}
}

View File

@@ -0,0 +1,11 @@
{
"domain": "neato",
"name": "Neato Botvac",
"codeowners": [],
"config_flow": true,
"dependencies": ["application_credentials"],
"documentation": "https://www.home-assistant.io/integrations/neato",
"iot_class": "cloud_polling",
"loggers": ["pybotvac"],
"requirements": ["pybotvac==0.0.28"]
}

View File

@@ -0,0 +1,81 @@
"""Support for Neato sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from pybotvac.exceptions import NeatoRobotException
from pybotvac.robot import Robot
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import NEATO_LOGIN, NEATO_ROBOTS, SCAN_INTERVAL_MINUTES
from .entity import NeatoEntity
from .hub import NeatoHub
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=SCAN_INTERVAL_MINUTES)
BATTERY = "Battery"
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Neato sensor using config entry."""
neato: NeatoHub = hass.data[NEATO_LOGIN]
dev = [NeatoSensor(neato, robot) for robot in hass.data[NEATO_ROBOTS]]
if not dev:
return
_LOGGER.debug("Adding robots for sensors %s", dev)
async_add_entities(dev, True)
class NeatoSensor(NeatoEntity, SensorEntity):
"""Neato sensor."""
_attr_device_class = SensorDeviceClass.BATTERY
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_native_unit_of_measurement = PERCENTAGE
_attr_available: bool = False
def __init__(self, neato: NeatoHub, robot: Robot) -> None:
"""Initialize Neato sensor."""
super().__init__(robot)
self._robot_serial: str = self.robot.serial
self._attr_unique_id = self.robot.serial
self._state: dict[str, Any] | None = None
def update(self) -> None:
"""Update Neato Sensor."""
try:
self._state = self.robot.state
except NeatoRobotException as ex:
if self._attr_available:
_LOGGER.error(
"Neato sensor connection error for '%s': %s", self.entity_id, ex
)
self._state = None
self._attr_available = False
return
self._attr_available = True
_LOGGER.debug("self._state=%s", self._state)
@property
def native_value(self) -> str | None:
"""Return the state."""
if self._state is not None:
return str(self._state["details"]["charge"])
return None

View File

@@ -0,0 +1,32 @@
custom_cleaning:
target:
entity:
integration: neato
domain: vacuum
fields:
mode:
default: 2
selector:
number:
min: 1
max: 2
mode: box
navigation:
default: 1
selector:
number:
min: 1
max: 3
mode: box
category:
default: 4
selector:
number:
min: 2
max: 4
step: 2
mode: box
zone:
example: "Kitchen"
selector:
text:

View File

@@ -0,0 +1,73 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"authorize_url_timeout": "[%key:common::config_flow::abort::oauth2_authorize_url_timeout%]",
"missing_configuration": "[%key:common::config_flow::abort::oauth2_missing_configuration%]",
"no_url_available": "[%key:common::config_flow::abort::oauth2_no_url_available%]",
"oauth_error": "[%key:common::config_flow::abort::oauth2_error%]",
"oauth_failed": "[%key:common::config_flow::abort::oauth2_failed%]",
"oauth_timeout": "[%key:common::config_flow::abort::oauth2_timeout%]",
"oauth_unauthorized": "[%key:common::config_flow::abort::oauth2_unauthorized%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"create_entry": {
"default": "[%key:common::config_flow::create_entry::authenticated%]"
},
"step": {
"pick_implementation": {
"data": {
"implementation": "[%key:common::config_flow::data::implementation%]"
},
"data_description": {
"implementation": "[%key:common::config_flow::description::implementation%]"
},
"title": "[%key:common::config_flow::title::oauth2_pick_implementation%]"
},
"reauth_confirm": {
"title": "[%key:common::config_flow::description::confirm_setup%]"
}
}
},
"entity": {
"button": {
"dismiss_alert": {
"name": "Dismiss alert"
}
},
"camera": {
"cleaning_map": {
"name": "Cleaning map"
}
},
"switch": {
"schedule": {
"name": "Schedule"
}
}
},
"services": {
"custom_cleaning": {
"description": "Starts a custom cleaning of your house.",
"fields": {
"category": {
"description": "Whether to use a persistent map or not for cleaning (i.e. No go lines): 2 for no map, 4 for map. Default to using map if not set (and fallback to no map if no map is found).",
"name": "Use cleaning map"
},
"mode": {
"description": "Sets the cleaning mode: 1 for eco and 2 for turbo. Defaults to turbo if not set.",
"name": "Cleaning mode"
},
"navigation": {
"description": "Sets the navigation mode: 1 for normal, 2 for extra care, 3 for deep. Defaults to normal if not set.",
"name": "Navigation mode"
},
"zone": {
"description": "Name of the zone to clean (only supported on the Botvac D7). Defaults to no zone i.e. complete house cleanup.",
"name": "Zone"
}
},
"name": "Custom cleaning"
}
}
}

View File

@@ -0,0 +1,118 @@
"""Support for Neato Connected Vacuums switches."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from pybotvac.exceptions import NeatoRobotException
from pybotvac.robot import Robot
from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_OFF, STATE_ON, EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import NEATO_LOGIN, NEATO_ROBOTS, SCAN_INTERVAL_MINUTES
from .entity import NeatoEntity
from .hub import NeatoHub
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=SCAN_INTERVAL_MINUTES)
SWITCH_TYPE_SCHEDULE = "schedule"
SWITCH_TYPES = {SWITCH_TYPE_SCHEDULE: ["Schedule"]}
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Neato switch with config entry."""
neato: NeatoHub = hass.data[NEATO_LOGIN]
dev = [
NeatoConnectedSwitch(neato, robot, type_name)
for robot in hass.data[NEATO_ROBOTS]
for type_name in SWITCH_TYPES
]
if not dev:
return
_LOGGER.debug("Adding switches %s", dev)
async_add_entities(dev, True)
class NeatoConnectedSwitch(NeatoEntity, SwitchEntity):
"""Neato Connected Switches."""
_attr_translation_key = "schedule"
_attr_available = False
_attr_entity_category = EntityCategory.CONFIG
def __init__(self, neato: NeatoHub, robot: Robot, switch_type: str) -> None:
"""Initialize the Neato Connected switches."""
super().__init__(robot)
self.type = switch_type
self._state: dict[str, Any] | None = None
self._schedule_state: str | None = None
self._clean_state = None
self._attr_unique_id = self.robot.serial
def update(self) -> None:
"""Update the states of Neato switches."""
_LOGGER.debug("Running Neato switch update for '%s'", self.entity_id)
try:
self._state = self.robot.state
except NeatoRobotException as ex:
if self._attr_available: # Print only once when available
_LOGGER.error(
"Neato switch connection error for '%s': %s", self.entity_id, ex
)
self._state = None
self._attr_available = False
return
self._attr_available = True
_LOGGER.debug("self._state=%s", self._state)
if self.type == SWITCH_TYPE_SCHEDULE:
_LOGGER.debug("State: %s", self._state)
if self._state is not None and self._state["details"]["isScheduleEnabled"]:
self._schedule_state = STATE_ON
else:
self._schedule_state = STATE_OFF
_LOGGER.debug(
"Schedule state for '%s': %s", self.entity_id, self._schedule_state
)
@property
def is_on(self) -> bool:
"""Return true if switch is on."""
return bool(
self.type == SWITCH_TYPE_SCHEDULE and self._schedule_state == STATE_ON
)
def turn_on(self, **kwargs: Any) -> None:
"""Turn the switch on."""
if self.type == SWITCH_TYPE_SCHEDULE:
try:
self.robot.enable_schedule()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato switch connection error '%s': %s", self.entity_id, ex
)
def turn_off(self, **kwargs: Any) -> None:
"""Turn the switch off."""
if self.type == SWITCH_TYPE_SCHEDULE:
try:
self.robot.disable_schedule()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato switch connection error '%s': %s", self.entity_id, ex
)

View File

@@ -0,0 +1,388 @@
"""Support for Neato Connected Vacuums."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import Any
from pybotvac import Robot
from pybotvac.exceptions import NeatoRobotException
import voluptuous as vol
from homeassistant.components.vacuum import (
ATTR_STATUS,
StateVacuumEntity,
VacuumActivity,
VacuumEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_MODE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv, entity_platform
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import (
ACTION,
ALERTS,
ERRORS,
MODE,
NEATO_LOGIN,
NEATO_MAP_DATA,
NEATO_PERSISTENT_MAPS,
NEATO_ROBOTS,
SCAN_INTERVAL_MINUTES,
)
from .entity import NeatoEntity
from .hub import NeatoHub
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=SCAN_INTERVAL_MINUTES)
ATTR_CLEAN_START = "clean_start"
ATTR_CLEAN_STOP = "clean_stop"
ATTR_CLEAN_AREA = "clean_area"
ATTR_CLEAN_BATTERY_START = "battery_level_at_clean_start"
ATTR_CLEAN_BATTERY_END = "battery_level_at_clean_end"
ATTR_CLEAN_SUSP_COUNT = "clean_suspension_count"
ATTR_CLEAN_SUSP_TIME = "clean_suspension_time"
ATTR_CLEAN_PAUSE_TIME = "clean_pause_time"
ATTR_CLEAN_ERROR_TIME = "clean_error_time"
ATTR_LAUNCHED_FROM = "launched_from"
ATTR_NAVIGATION = "navigation"
ATTR_CATEGORY = "category"
ATTR_ZONE = "zone"
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Neato vacuum with config entry."""
neato: NeatoHub = hass.data[NEATO_LOGIN]
mapdata: dict[str, Any] | None = hass.data.get(NEATO_MAP_DATA)
persistent_maps: dict[str, Any] | None = hass.data.get(NEATO_PERSISTENT_MAPS)
dev = [
NeatoConnectedVacuum(neato, robot, mapdata, persistent_maps)
for robot in hass.data[NEATO_ROBOTS]
]
if not dev:
return
_LOGGER.debug("Adding vacuums %s", dev)
async_add_entities(dev, True)
platform = entity_platform.async_get_current_platform()
assert platform is not None
platform.async_register_entity_service(
"custom_cleaning",
{
vol.Optional(ATTR_MODE, default=2): cv.positive_int,
vol.Optional(ATTR_NAVIGATION, default=1): cv.positive_int,
vol.Optional(ATTR_CATEGORY, default=4): cv.positive_int,
vol.Optional(ATTR_ZONE): cv.string,
},
"neato_custom_cleaning",
)
class NeatoConnectedVacuum(NeatoEntity, StateVacuumEntity):
"""Representation of a Neato Connected Vacuum."""
_attr_supported_features = (
VacuumEntityFeature.BATTERY
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.STOP
| VacuumEntityFeature.START
| VacuumEntityFeature.CLEAN_SPOT
| VacuumEntityFeature.STATE
| VacuumEntityFeature.MAP
| VacuumEntityFeature.LOCATE
)
_attr_name = None
def __init__(
self,
neato: NeatoHub,
robot: Robot,
mapdata: dict[str, Any] | None,
persistent_maps: dict[str, Any] | None,
) -> None:
"""Initialize the Neato Connected Vacuum."""
super().__init__(robot)
self._attr_available: bool = neato is not None
self._mapdata = mapdata
self._robot_has_map: bool = self.robot.has_persistent_maps
self._robot_maps = persistent_maps
self._robot_serial: str = self.robot.serial
self._attr_unique_id: str = self.robot.serial
self._status_state: str | None = None
self._state: dict[str, Any] | None = None
self._clean_time_start: str | None = None
self._clean_time_stop: str | None = None
self._clean_area: float | None = None
self._clean_battery_start: int | None = None
self._clean_battery_end: int | None = None
self._clean_susp_charge_count: int | None = None
self._clean_susp_time: int | None = None
self._clean_pause_time: int | None = None
self._clean_error_time: int | None = None
self._launched_from: str | None = None
self._robot_boundaries: list = []
self._robot_stats: dict[str, Any] | None = None
def update(self) -> None:
"""Update the states of Neato Vacuums."""
_LOGGER.debug("Running Neato Vacuums update for '%s'", self.entity_id)
try:
if self._robot_stats is None:
self._robot_stats = self.robot.get_general_info().json().get("data")
except NeatoRobotException:
_LOGGER.warning("Couldn't fetch robot information of %s", self.entity_id)
try:
self._state = self.robot.state
except NeatoRobotException as ex:
if self._attr_available: # print only once when available
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
self._state = None
self._attr_available = False
return
if self._state is None:
return
self._attr_available = True
_LOGGER.debug("self._state=%s", self._state)
if "alert" in self._state:
robot_alert = ALERTS.get(self._state["alert"])
else:
robot_alert = None
if self._state["state"] == 1:
if self._state["details"]["isCharging"]:
self._attr_activity = VacuumActivity.DOCKED
self._status_state = "Charging"
elif (
self._state["details"]["isDocked"]
and not self._state["details"]["isCharging"]
):
self._attr_activity = VacuumActivity.DOCKED
self._status_state = "Docked"
else:
self._attr_activity = VacuumActivity.IDLE
self._status_state = "Stopped"
if robot_alert is not None:
self._status_state = robot_alert
elif self._state["state"] == 2:
if robot_alert is None:
self._attr_activity = VacuumActivity.CLEANING
self._status_state = (
f"{MODE.get(self._state['cleaning']['mode'])} "
f"{ACTION.get(self._state['action'])}"
)
if (
"boundary" in self._state["cleaning"]
and "name" in self._state["cleaning"]["boundary"]
):
self._status_state += (
f" {self._state['cleaning']['boundary']['name']}"
)
else:
self._status_state = robot_alert
elif self._state["state"] == 3:
self._attr_activity = VacuumActivity.PAUSED
self._status_state = "Paused"
elif self._state["state"] == 4:
self._attr_activity = VacuumActivity.ERROR
self._status_state = ERRORS.get(self._state["error"])
self._attr_battery_level = self._state["details"]["charge"]
if self._mapdata is None or not self._mapdata.get(self._robot_serial, {}).get(
"maps", []
):
return
mapdata: dict[str, Any] = self._mapdata[self._robot_serial]["maps"][0]
self._clean_time_start = mapdata["start_at"]
self._clean_time_stop = mapdata["end_at"]
self._clean_area = mapdata["cleaned_area"]
self._clean_susp_charge_count = mapdata["suspended_cleaning_charging_count"]
self._clean_susp_time = mapdata["time_in_suspended_cleaning"]
self._clean_pause_time = mapdata["time_in_pause"]
self._clean_error_time = mapdata["time_in_error"]
self._clean_battery_start = mapdata["run_charge_at_start"]
self._clean_battery_end = mapdata["run_charge_at_end"]
self._launched_from = mapdata["launched_from"]
if (
self._robot_has_map
and self._state
and self._state["availableServices"]["maps"] != "basic-1"
and self._robot_maps
):
allmaps: dict = self._robot_maps[self._robot_serial]
_LOGGER.debug(
"Found the following maps for '%s': %s", self.entity_id, allmaps
)
self._robot_boundaries = [] # Reset boundaries before refreshing boundaries
for maps in allmaps:
try:
robot_boundaries = self.robot.get_map_boundaries(maps["id"]).json()
except NeatoRobotException as ex:
_LOGGER.error(
"Could not fetch map boundaries for '%s': %s",
self.entity_id,
ex,
)
return
_LOGGER.debug(
"Boundaries for robot '%s' in map '%s': %s",
self.entity_id,
maps["name"],
robot_boundaries,
)
if "boundaries" in robot_boundaries["data"]:
self._robot_boundaries += robot_boundaries["data"]["boundaries"]
_LOGGER.debug(
"List of boundaries for '%s': %s",
self.entity_id,
self._robot_boundaries,
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the vacuum cleaner."""
data: dict[str, Any] = {}
if self._status_state is not None:
data[ATTR_STATUS] = self._status_state
if self._clean_time_start is not None:
data[ATTR_CLEAN_START] = self._clean_time_start
if self._clean_time_stop is not None:
data[ATTR_CLEAN_STOP] = self._clean_time_stop
if self._clean_area is not None:
data[ATTR_CLEAN_AREA] = self._clean_area
if self._clean_susp_charge_count is not None:
data[ATTR_CLEAN_SUSP_COUNT] = self._clean_susp_charge_count
if self._clean_susp_time is not None:
data[ATTR_CLEAN_SUSP_TIME] = self._clean_susp_time
if self._clean_pause_time is not None:
data[ATTR_CLEAN_PAUSE_TIME] = self._clean_pause_time
if self._clean_error_time is not None:
data[ATTR_CLEAN_ERROR_TIME] = self._clean_error_time
if self._clean_battery_start is not None:
data[ATTR_CLEAN_BATTERY_START] = self._clean_battery_start
if self._clean_battery_end is not None:
data[ATTR_CLEAN_BATTERY_END] = self._clean_battery_end
if self._launched_from is not None:
data[ATTR_LAUNCHED_FROM] = self._launched_from
return data
@property
def device_info(self) -> DeviceInfo:
"""Device info for neato robot."""
device_info = self._attr_device_info
if self._robot_stats:
device_info["manufacturer"] = self._robot_stats["battery"]["vendor"]
device_info["model"] = self._robot_stats["model"]
device_info["sw_version"] = self._robot_stats["firmware"]
return device_info
def start(self) -> None:
"""Start cleaning or resume cleaning."""
if self._state:
try:
if self._state["state"] == 1:
self.robot.start_cleaning()
elif self._state["state"] == 3:
self.robot.resume_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def pause(self) -> None:
"""Pause the vacuum."""
try:
self.robot.pause_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def return_to_base(self, **kwargs: Any) -> None:
"""Set the vacuum cleaner to return to the dock."""
try:
if self._attr_activity == VacuumActivity.CLEANING:
self.robot.pause_cleaning()
self._attr_activity = VacuumActivity.RETURNING
self.robot.send_to_base()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def stop(self, **kwargs: Any) -> None:
"""Stop the vacuum cleaner."""
try:
self.robot.stop_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def locate(self, **kwargs: Any) -> None:
"""Locate the robot by making it emit a sound."""
try:
self.robot.locate()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def clean_spot(self, **kwargs: Any) -> None:
"""Run a spot cleaning starting from the base."""
try:
self.robot.start_spot_cleaning()
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)
def neato_custom_cleaning(
self, mode: str, navigation: str, category: str, zone: str | None = None
) -> None:
"""Zone cleaning service call."""
boundary_id = None
if zone is not None:
for boundary in self._robot_boundaries:
if zone in boundary["name"]:
boundary_id = boundary["id"]
if boundary_id is None:
_LOGGER.error(
"Zone '%s' was not found for the robot '%s'", zone, self.entity_id
)
return
_LOGGER.debug(
"Start cleaning zone '%s' with robot %s", zone, self.entity_id
)
self._attr_activity = VacuumActivity.CLEANING
try:
self.robot.start_cleaning(mode, navigation, category, boundary_id)
except NeatoRobotException as ex:
_LOGGER.error(
"Neato vacuum connection error for '%s': %s", self.entity_id, ex
)

View File

@@ -30,7 +30,6 @@ class NintendoDevice(CoordinatorEntity[NintendoUpdateCoordinator]):
sw_version=device.extra["firmwareVersion"]["displayedVersion"],
model=device.model,
model_id=device.generation,
serial_number=device.extra["serialNumber"],
)
async def async_added_to_hass(self) -> None:

View File

@@ -7,5 +7,5 @@
"iot_class": "cloud_polling",
"loggers": ["pynintendoparental"],
"quality_scale": "bronze",
"requirements": ["pynintendoparental==1.1.2"]
"requirements": ["pynintendoparental==1.1.3"]
}

View File

@@ -8,6 +8,6 @@
"iot_class": "cloud_polling",
"loggers": ["pynordpool"],
"quality_scale": "platinum",
"requirements": ["pynordpool==0.3.2"],
"requirements": ["pynordpool==0.3.1"],
"single_config_entry": true
}

View File

@@ -14,7 +14,7 @@ from onedrive_personal_sdk.exceptions import (
NotFoundError,
OneDriveException,
)
from onedrive_personal_sdk.models.items import Item, ItemUpdate
from onedrive_personal_sdk.models.items import ItemUpdate
from homeassistant.const import CONF_ACCESS_TOKEN, Platform
from homeassistant.core import HomeAssistant
@@ -202,9 +202,7 @@ async def _get_onedrive_client(
)
async def _handle_item_operation(
func: Callable[[], Awaitable[Item]], folder: str
) -> Item:
async def _handle_item_operation[T](func: Callable[[], Awaitable[T]], folder: str) -> T:
try:
return await func()
except NotFoundError:

View File

@@ -10,5 +10,5 @@
"iot_class": "cloud_polling",
"loggers": ["onedrive_personal_sdk"],
"quality_scale": "platinum",
"requirements": ["onedrive-personal-sdk==0.0.14"]
"requirements": ["onedrive-personal-sdk==0.0.15"]
}

View File

@@ -9,5 +9,5 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["openai==2.2.0", "python-open-router==0.3.1"]
"requirements": ["openai==2.2.0", "python-open-router==0.3.2"]
}

View File

@@ -229,7 +229,7 @@ class OverkizConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle the local authentication step via config flow."""
errors = {}
description_placeholders = {
"somfy-developer-mode-docs": "https://github.com/Somfy-Developer/Somfy-TaHoma-Developer-Mode#getting-started"
"somfy_developer_mode_docs": "https://github.com/Somfy-Developer/Somfy-TaHoma-Developer-Mode#getting-started"
}
if user_input:

View File

@@ -41,7 +41,7 @@
"token": "Token generated by the app used to control your device.",
"verify_ssl": "Verify the SSL certificate. Select this only if you are connecting via the hostname."
},
"description": "By activating the [Developer Mode of your TaHoma box]({somfy-developer-mode-docs}), you can authorize third-party software (like Home Assistant) to connect to it via your local network.\n\n1. Open the TaHoma By Somfy application on your device.\n2. Navigate to the Help & advanced features -> Advanced features menu in the application.\n3. Activate Developer Mode by tapping 7 times on the version number of your gateway (like 2025.1.4-11).\n4. Generate a token from the Developer Mode menu to authenticate your API calls.\n\n5. Enter the generated token below and update the host to include your Gateway PIN or the IP address of your gateway."
"description": "By activating the [Developer Mode of your TaHoma box]({somfy_developer_mode_docs}), you can authorize third-party software (like Home Assistant) to connect to it via your local network.\n\n1. Open the TaHoma By Somfy application on your device.\n2. Navigate to the Help & advanced features -> Advanced features menu in the application.\n3. Activate Developer Mode by tapping 7 times on the version number of your gateway (like 2025.1.4-11).\n4. Generate a token from the Developer Mode menu to authenticate your API calls.\n\n5. Enter the generated token below and update the host to include your Gateway PIN or the IP address of your gateway."
},
"local_or_cloud": {
"data": {

View File

@@ -0,0 +1,3 @@
"""The pandora component."""
DOMAIN = "pandora"

View File

@@ -0,0 +1,10 @@
{
"domain": "pandora",
"name": "Pandora",
"codeowners": [],
"documentation": "https://www.home-assistant.io/integrations/pandora",
"iot_class": "local_polling",
"loggers": ["pexpect", "ptyprocess"],
"quality_scale": "legacy",
"requirements": ["pexpect==4.9.0"]
}

View File

@@ -0,0 +1,369 @@
"""Component for controlling Pandora stations through the pianobar client."""
from __future__ import annotations
from datetime import timedelta
import logging
import os
import re
import shutil
import signal
from typing import cast
import pexpect
from homeassistant import util
from homeassistant.components.media_player import (
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
SERVICE_MEDIA_NEXT_TRACK,
SERVICE_MEDIA_PLAY,
SERVICE_MEDIA_PLAY_PAUSE,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_UP,
)
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, Event, HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
CMD_MAP = {
SERVICE_MEDIA_NEXT_TRACK: "n",
SERVICE_MEDIA_PLAY_PAUSE: "p",
SERVICE_MEDIA_PLAY: "p",
SERVICE_VOLUME_UP: ")",
SERVICE_VOLUME_DOWN: "(",
}
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=2)
CURRENT_SONG_PATTERN = re.compile(r'"(.*?)"\s+by\s+"(.*?)"\son\s+"(.*?)"', re.MULTILINE)
STATION_PATTERN = re.compile(r'Station\s"(.+?)"', re.MULTILINE)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Pandora media player platform."""
create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_system_packages_yaml_integration_{DOMAIN}",
breaks_in_ha_version="2025.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_system_packages_yaml_integration",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "Pandora",
},
)
if not _pianobar_exists():
return
pandora = PandoraMediaPlayer("Pandora")
# Make sure we end the pandora subprocess on exit in case user doesn't
# power it down.
def _stop_pianobar(_event: Event) -> None:
pandora.turn_off()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _stop_pianobar)
add_entities([pandora])
class PandoraMediaPlayer(MediaPlayerEntity):
"""A media player that uses the Pianobar interface to Pandora."""
_attr_media_content_type = MediaType.MUSIC
# MediaPlayerEntityFeature.VOLUME_SET is close to available
# but we need volume up/down controls in the GUI.
_attr_supported_features = (
MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.PLAY
)
def __init__(self, name: str) -> None:
"""Initialize the Pandora device."""
self._attr_name = name
self._attr_state = MediaPlayerState.OFF
self._attr_source = ""
self._attr_media_title = ""
self._attr_media_artist = ""
self._attr_media_album_name = ""
self._attr_source_list = []
self._time_remaining = 0
self._attr_media_duration = 0
self._pianobar: pexpect.spawn[str] | None = None
async def _start_pianobar(self) -> bool:
pianobar = pexpect.spawn("pianobar", encoding="utf-8")
pianobar.delaybeforesend = None
pianobar.delayafterread = None
pianobar.delayafterclose = 0
pianobar.delayafterterminate = 0
_LOGGER.debug("Started pianobar subprocess")
mode = await pianobar.expect(
["Receiving new playlist", "Select station:", "Email:"],
async_=True,
)
if mode == 1:
# station list was presented. dismiss it.
pianobar.sendcontrol("m")
elif mode == 2:
_LOGGER.warning(
"The pianobar client is not configured to log in. "
"Please create a configuration file for it as described at "
"https://www.home-assistant.io/integrations/pandora/"
)
# pass through the email/password prompts to quit cleanly
pianobar.sendcontrol("m")
pianobar.sendcontrol("m")
pianobar.terminate()
return False
self._pianobar = pianobar
return True
async def async_turn_on(self) -> None:
"""Turn the media player on."""
if self.state == MediaPlayerState.OFF and await self._start_pianobar():
await self._update_stations()
await self.update_playing_status()
self._attr_state = MediaPlayerState.IDLE
self.schedule_update_ha_state()
def turn_off(self) -> None:
"""Turn the media player off."""
if self._pianobar is None:
_LOGGER.warning("Pianobar subprocess already stopped")
return
self._pianobar.send("q")
try:
_LOGGER.debug("Stopped Pianobar subprocess")
self._pianobar.terminate()
except pexpect.exceptions.TIMEOUT:
# kill the process group
if (pid := self._pianobar.pid) is not None:
os.killpg(os.getpgid(pid), signal.SIGTERM)
_LOGGER.debug("Killed Pianobar subprocess")
self._pianobar = None
self._attr_state = MediaPlayerState.OFF
self.schedule_update_ha_state()
async def async_media_play(self) -> None:
"""Send play command."""
await self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE)
self._attr_state = MediaPlayerState.PLAYING
self.schedule_update_ha_state()
async def async_media_pause(self) -> None:
"""Send pause command."""
await self._send_pianobar_command(SERVICE_MEDIA_PLAY_PAUSE)
self._attr_state = MediaPlayerState.PAUSED
self.schedule_update_ha_state()
async def async_media_next_track(self) -> None:
"""Go to next track."""
await self._send_pianobar_command(SERVICE_MEDIA_NEXT_TRACK)
self.schedule_update_ha_state()
async def async_select_source(self, source: str) -> None:
"""Choose a different Pandora station and play it."""
if self.source_list is None:
return
try:
station_index = self.source_list.index(source)
except ValueError:
_LOGGER.warning("Station %s is not in list", source)
return
_LOGGER.debug("Setting station %s, %d", source, station_index)
assert self._pianobar is not None
await self._send_station_list_command()
self._pianobar.sendline(f"{station_index}")
await self._pianobar.expect("\r\n", async_=True)
self._attr_state = MediaPlayerState.PLAYING
async def _send_station_list_command(self) -> None:
"""Send a station list command."""
assert self._pianobar is not None
self._pianobar.send("s")
try:
await self._pianobar.expect("Select station:", async_=True, timeout=1)
except pexpect.exceptions.TIMEOUT:
# try again. Buffer was contaminated.
await self._clear_buffer()
self._pianobar.send("s")
await self._pianobar.expect("Select station:", async_=True)
async def update_playing_status(self) -> None:
"""Query pianobar for info about current media_title, station."""
response = await self._query_for_playing_status()
if not response:
return
self._update_current_station(response)
self._update_current_song(response)
self._update_song_position()
async def _query_for_playing_status(self) -> str | None:
"""Query system for info about current track."""
assert self._pianobar is not None
await self._clear_buffer()
self._pianobar.send("i")
try:
match_idx = await self._pianobar.expect(
[
r"(\d\d):(\d\d)/(\d\d):(\d\d)",
"No song playing",
"Select station",
"Receiving new playlist",
],
async_=True,
)
except pexpect.exceptions.EOF:
_LOGGER.warning("Pianobar process already exited")
return None
self._log_match()
if match_idx == 1:
# idle.
return None
if match_idx == 2:
# stuck on a station selection dialog. Clear it.
_LOGGER.warning("On unexpected station list page")
self._pianobar.sendcontrol("m") # press enter
self._pianobar.sendcontrol("m") # do it again b/c an 'i' got in
await self.update_playing_status()
return None
if match_idx == 3:
_LOGGER.debug("Received new playlist list")
await self.update_playing_status()
return None
return self._pianobar.before
def _update_current_station(self, response: str) -> None:
"""Update current station."""
if station_match := re.search(STATION_PATTERN, response):
self._attr_source = station_match.group(1)
_LOGGER.debug("Got station as: %s", self._attr_source)
else:
_LOGGER.warning("No station match")
def _update_current_song(self, response: str) -> None:
"""Update info about current song."""
if song_match := re.search(CURRENT_SONG_PATTERN, response):
(
self._attr_media_title,
self._attr_media_artist,
self._attr_media_album_name,
) = song_match.groups()
_LOGGER.debug("Got song as: %s", self._attr_media_title)
else:
_LOGGER.warning("No song match")
@util.Throttle(MIN_TIME_BETWEEN_UPDATES)
def _update_song_position(self) -> None:
"""Get the song position and duration.
It's hard to predict whether or not the music will start during init
so we have to detect state by checking the ticker.
"""
assert self._pianobar is not None
(
cur_minutes,
cur_seconds,
total_minutes,
total_seconds,
) = cast(re.Match[str], self._pianobar.match).groups()
time_remaining = int(cur_minutes) * 60 + int(cur_seconds)
self._attr_media_duration = int(total_minutes) * 60 + int(total_seconds)
if time_remaining not in (self._time_remaining, self._attr_media_duration):
self._attr_state = MediaPlayerState.PLAYING
elif self.state == MediaPlayerState.PLAYING:
self._attr_state = MediaPlayerState.PAUSED
self._time_remaining = time_remaining
def _log_match(self) -> None:
"""Log grabbed values from console."""
assert self._pianobar is not None
_LOGGER.debug(
"Before: %s\nMatch: %s\nAfter: %s",
repr(self._pianobar.before),
repr(self._pianobar.match),
repr(self._pianobar.after),
)
async def _send_pianobar_command(self, service_cmd: str) -> None:
"""Send a command to Pianobar."""
assert self._pianobar is not None
command = CMD_MAP.get(service_cmd)
_LOGGER.debug("Sending pinaobar command %s for %s", command, service_cmd)
if command is None:
_LOGGER.warning("Command %s not supported yet", service_cmd)
return
await self._clear_buffer()
self._pianobar.sendline(command)
async def _update_stations(self) -> None:
"""List defined Pandora stations."""
assert self._pianobar is not None
await self._send_station_list_command()
station_lines = self._pianobar.before or ""
_LOGGER.debug("Getting stations: %s", station_lines)
self._attr_source_list = []
for line in station_lines.splitlines():
if match := re.search(r"\d+\).....(.+)", line):
station = match.group(1).strip()
_LOGGER.debug("Found station %s", station)
self._attr_source_list.append(station)
else:
_LOGGER.debug("No station match on %s", line)
self._pianobar.sendcontrol("m") # press enter with blank line
self._pianobar.sendcontrol("m") # do it twice in case an 'i' got in
async def _clear_buffer(self) -> None:
"""Clear buffer from pexpect.
This is necessary because there are a bunch of 00:00 in the buffer
"""
assert self._pianobar is not None
try:
while not await self._pianobar.expect(".+", async_=True, timeout=0.1):
pass
except pexpect.exceptions.TIMEOUT:
pass
except pexpect.exceptions.EOF:
pass
def _pianobar_exists() -> bool:
"""Verify that Pianobar is properly installed."""
pianobar_exe = shutil.which("pianobar")
if pianobar_exe:
return True
_LOGGER.warning(
"The Pandora integration depends on the Pianobar client, which "
"cannot be found. Please install using instructions at "
"https://www.home-assistant.io/integrations/media_player.pandora/"
)
return False

View File

@@ -159,8 +159,6 @@ class PortainerConfigFlow(ConfigFlow, domain=DOMAIN):
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(user_input[CONF_API_TOKEN])
self._abort_if_unique_id_configured()
return self.async_update_reload_and_abort(
reconf_entry,
data_updates={

View File

@@ -19,5 +19,5 @@
"iot_class": "local_push",
"loggers": ["reolink_aio"],
"quality_scale": "platinum",
"requirements": ["reolink-aio==0.16.2"]
"requirements": ["reolink-aio==0.16.3"]
}

Some files were not shown because too many files have changed in this diff Show More