mirror of
				https://github.com/home-assistant/core.git
				synced 2025-11-04 00:19:31 +00:00 
			
		
		
		
	Co-authored-by: Josef Zweck <josef@zweck.dev> Co-authored-by: Franck Nijhof <git@frenck.dev>
		
			
				
	
	
		
			56 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			56 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""DataUpdateCoordinator for the airtouch integration."""
 | 
						|
 | 
						|
import logging
 | 
						|
 | 
						|
from airtouch4pyapi import AirTouch
 | 
						|
from airtouch4pyapi.airtouch import AirTouchStatus
 | 
						|
 | 
						|
from homeassistant.components.climate import SCAN_INTERVAL
 | 
						|
from homeassistant.config_entries import ConfigEntry
 | 
						|
from homeassistant.core import HomeAssistant
 | 
						|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
 | 
						|
 | 
						|
from .const import DOMAIN
 | 
						|
 | 
						|
_LOGGER = logging.getLogger(__name__)
 | 
						|
 | 
						|
type AirTouch4ConfigEntry = ConfigEntry[AirtouchDataUpdateCoordinator]
 | 
						|
 | 
						|
 | 
						|
class AirtouchDataUpdateCoordinator(DataUpdateCoordinator):
 | 
						|
    """Class to manage fetching Airtouch data."""
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self, hass: HomeAssistant, entry: AirTouch4ConfigEntry, airtouch: AirTouch
 | 
						|
    ) -> None:
 | 
						|
        """Initialize global Airtouch data updater."""
 | 
						|
        self.airtouch = airtouch
 | 
						|
 | 
						|
        super().__init__(
 | 
						|
            hass,
 | 
						|
            _LOGGER,
 | 
						|
            config_entry=entry,
 | 
						|
            name=DOMAIN,
 | 
						|
            update_interval=SCAN_INTERVAL,
 | 
						|
        )
 | 
						|
 | 
						|
    async def _async_update_data(self):
 | 
						|
        """Fetch data from Airtouch."""
 | 
						|
        await self.airtouch.UpdateInfo()
 | 
						|
        if self.airtouch.Status != AirTouchStatus.OK:
 | 
						|
            raise UpdateFailed("Airtouch connection issue")
 | 
						|
        return {
 | 
						|
            "acs": [
 | 
						|
                {"ac_number": ac.AcNumber, "is_on": ac.IsOn}
 | 
						|
                for ac in self.airtouch.GetAcs()
 | 
						|
            ],
 | 
						|
            "groups": [
 | 
						|
                {
 | 
						|
                    "group_number": group.GroupNumber,
 | 
						|
                    "group_name": group.GroupName,
 | 
						|
                    "is_on": group.IsOn,
 | 
						|
                }
 | 
						|
                for group in self.airtouch.GetGroups()
 | 
						|
            ],
 | 
						|
        }
 |