mirror of
https://github.com/home-assistant/core.git
synced 2026-04-21 00:45:42 +00:00
117 lines
3.6 KiB
Python
117 lines
3.6 KiB
Python
"""Satel Integra client."""
|
|
|
|
from collections.abc import Callable
|
|
|
|
from satel_integra import AsyncSatel
|
|
from satel_integra.exceptions import (
|
|
SatelConnectFailedError,
|
|
SatelConnectionInitializationError,
|
|
SatelPanelBusyError,
|
|
)
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_HOST, CONF_PORT
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
|
|
|
from .const import (
|
|
CONF_ENCRYPTION_KEY,
|
|
CONF_OUTPUT_NUMBER,
|
|
CONF_PARTITION_NUMBER,
|
|
CONF_SWITCHABLE_OUTPUT_NUMBER,
|
|
CONF_ZONE_NUMBER,
|
|
DOMAIN,
|
|
SUBENTRY_TYPE_OUTPUT,
|
|
SUBENTRY_TYPE_PARTITION,
|
|
SUBENTRY_TYPE_SWITCHABLE_OUTPUT,
|
|
SUBENTRY_TYPE_ZONE,
|
|
)
|
|
|
|
|
|
class SatelClient:
|
|
"""Client to connect to Satel Integra."""
|
|
|
|
controller: AsyncSatel
|
|
|
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
|
"""Initialize the client wrapper."""
|
|
self.hass = hass
|
|
self.config_entry = entry
|
|
|
|
host = entry.data[CONF_HOST]
|
|
port = entry.data[CONF_PORT]
|
|
|
|
# Make sure we initialize the Satel controller with the configured entries to monitor
|
|
partitions = [
|
|
subentry.data[CONF_PARTITION_NUMBER]
|
|
for subentry in entry.subentries.values()
|
|
if subentry.subentry_type == SUBENTRY_TYPE_PARTITION
|
|
]
|
|
|
|
zones = [
|
|
subentry.data[CONF_ZONE_NUMBER]
|
|
for subentry in entry.subentries.values()
|
|
if subentry.subentry_type == SUBENTRY_TYPE_ZONE
|
|
]
|
|
|
|
outputs = [
|
|
subentry.data[CONF_OUTPUT_NUMBER]
|
|
for subentry in entry.subentries.values()
|
|
if subentry.subentry_type == SUBENTRY_TYPE_OUTPUT
|
|
]
|
|
|
|
switchable_outputs = [
|
|
subentry.data[CONF_SWITCHABLE_OUTPUT_NUMBER]
|
|
for subentry in entry.subentries.values()
|
|
if subentry.subentry_type == SUBENTRY_TYPE_SWITCHABLE_OUTPUT
|
|
]
|
|
|
|
monitored_outputs = outputs + switchable_outputs
|
|
|
|
self.controller = AsyncSatel(
|
|
host,
|
|
port,
|
|
zones,
|
|
monitored_outputs,
|
|
partitions,
|
|
integration_key=entry.data[CONF_ENCRYPTION_KEY],
|
|
)
|
|
|
|
async def async_connect(
|
|
self,
|
|
zones_update_callback: Callable[[dict[int, int]], None],
|
|
outputs_update_callback: Callable[[dict[int, int]], None],
|
|
partitions_update_callback: Callable[[], None],
|
|
) -> None:
|
|
"""Start controller connection."""
|
|
try:
|
|
await self.controller.connect(raise_exceptions=True)
|
|
except SatelConnectFailedError as ex:
|
|
raise ConfigEntryNotReady(
|
|
translation_domain=DOMAIN,
|
|
translation_key="cannot_connect",
|
|
) from ex
|
|
except SatelPanelBusyError as ex:
|
|
raise ConfigEntryNotReady(
|
|
translation_domain=DOMAIN,
|
|
translation_key="panel_busy",
|
|
) from ex
|
|
except SatelConnectionInitializationError as ex:
|
|
raise ConfigEntryError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="connection_initialization_failed",
|
|
) from ex
|
|
|
|
self.controller.register_callbacks(
|
|
alarm_status_callback=partitions_update_callback,
|
|
zone_changed_callback=zones_update_callback,
|
|
output_changed_callback=outputs_update_callback,
|
|
)
|
|
|
|
await self.controller.start(enable_monitoring=True)
|
|
|
|
async def async_close(self) -> None:
|
|
"""Close the connection."""
|
|
|
|
await self.controller.close()
|