mirror of
https://github.com/home-assistant/core.git
synced 2025-07-15 09:17:10 +00:00
Move upcloud coordinator to separate module (#117536)
This commit is contained in:
parent
c2bf4b905c
commit
962dd81eb7
@ -27,12 +27,10 @@ from homeassistant.helpers.dispatcher import (
|
|||||||
async_dispatcher_connect,
|
async_dispatcher_connect,
|
||||||
async_dispatcher_send,
|
async_dispatcher_send,
|
||||||
)
|
)
|
||||||
from homeassistant.helpers.update_coordinator import (
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
||||||
CoordinatorEntity,
|
|
||||||
DataUpdateCoordinator,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .const import CONFIG_ENTRY_UPDATE_SIGNAL_TEMPLATE, DEFAULT_SCAN_INTERVAL, DOMAIN
|
from .const import CONFIG_ENTRY_UPDATE_SIGNAL_TEMPLATE, DEFAULT_SCAN_INTERVAL, DOMAIN
|
||||||
|
from .coordinator import UpCloudDataUpdateCoordinator
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -56,40 +54,6 @@ SIGNAL_UPDATE_UPCLOUD = "upcloud_update"
|
|||||||
STATE_MAP = {"error": STATE_PROBLEM, "started": STATE_ON, "stopped": STATE_OFF}
|
STATE_MAP = {"error": STATE_PROBLEM, "started": STATE_ON, "stopped": STATE_OFF}
|
||||||
|
|
||||||
|
|
||||||
class UpCloudDataUpdateCoordinator(
|
|
||||||
DataUpdateCoordinator[dict[str, upcloud_api.Server]]
|
|
||||||
): # pylint: disable=hass-enforce-coordinator-module
|
|
||||||
"""UpCloud data update coordinator."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
hass: HomeAssistant,
|
|
||||||
*,
|
|
||||||
cloud_manager: upcloud_api.CloudManager,
|
|
||||||
update_interval: timedelta,
|
|
||||||
username: str,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize coordinator."""
|
|
||||||
super().__init__(
|
|
||||||
hass, _LOGGER, name=f"{username}@UpCloud", update_interval=update_interval
|
|
||||||
)
|
|
||||||
self.cloud_manager = cloud_manager
|
|
||||||
|
|
||||||
async def async_update_config(self, config_entry: ConfigEntry) -> None:
|
|
||||||
"""Handle config update."""
|
|
||||||
self.update_interval = timedelta(
|
|
||||||
seconds=config_entry.options[CONF_SCAN_INTERVAL]
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _async_update_data(self) -> dict[str, upcloud_api.Server]:
|
|
||||||
return {
|
|
||||||
x.uuid: x
|
|
||||||
for x in await self.hass.async_add_executor_job(
|
|
||||||
self.cloud_manager.get_servers
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class UpCloudHassData:
|
class UpCloudHassData:
|
||||||
"""Home Assistant UpCloud runtime data."""
|
"""Home Assistant UpCloud runtime data."""
|
||||||
|
49
homeassistant/components/upcloud/coordinator.py
Normal file
49
homeassistant/components/upcloud/coordinator.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
"""Coordinator for UpCloud."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import timedelta
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import upcloud_api
|
||||||
|
|
||||||
|
from homeassistant.config_entries import ConfigEntry
|
||||||
|
from homeassistant.const import CONF_SCAN_INTERVAL
|
||||||
|
from homeassistant.core import HomeAssistant
|
||||||
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class UpCloudDataUpdateCoordinator(
|
||||||
|
DataUpdateCoordinator[dict[str, upcloud_api.Server]]
|
||||||
|
):
|
||||||
|
"""UpCloud data update coordinator."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
*,
|
||||||
|
cloud_manager: upcloud_api.CloudManager,
|
||||||
|
update_interval: timedelta,
|
||||||
|
username: str,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize coordinator."""
|
||||||
|
super().__init__(
|
||||||
|
hass, _LOGGER, name=f"{username}@UpCloud", update_interval=update_interval
|
||||||
|
)
|
||||||
|
self.cloud_manager = cloud_manager
|
||||||
|
|
||||||
|
async def async_update_config(self, config_entry: ConfigEntry) -> None:
|
||||||
|
"""Handle config update."""
|
||||||
|
self.update_interval = timedelta(
|
||||||
|
seconds=config_entry.options[CONF_SCAN_INTERVAL]
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _async_update_data(self) -> dict[str, upcloud_api.Server]:
|
||||||
|
return {
|
||||||
|
x.uuid: x
|
||||||
|
for x in await self.hass.async_add_executor_job(
|
||||||
|
self.cloud_manager.get_servers
|
||||||
|
)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user