"""Support for TPLink lights.""" from datetime import timedelta import logging import time from typing import Any, Dict, NamedTuple, Tuple, cast from pyHS100 import SmartBulb, SmartDeviceException from homeassistant.components.light import ( ATTR_BRIGHTNESS, ATTR_COLOR_TEMP, ATTR_HS_COLOR, SUPPORT_BRIGHTNESS, SUPPORT_COLOR, SUPPORT_COLOR_TEMP, Light, ) import homeassistant.helpers.device_registry as dr from homeassistant.helpers.typing import HomeAssistantType from homeassistant.util.color import ( color_temperature_kelvin_to_mired as kelvin_to_mired, color_temperature_mired_to_kelvin as mired_to_kelvin, ) from . import CONF_LIGHT, DOMAIN as TPLINK_DOMAIN from .common import async_add_entities_retry PARALLEL_UPDATES = 0 SCAN_INTERVAL = timedelta(seconds=5) _LOGGER = logging.getLogger(__name__) ATTR_CURRENT_POWER_W = "current_power_w" ATTR_DAILY_ENERGY_KWH = "daily_energy_kwh" ATTR_MONTHLY_ENERGY_KWH = "monthly_energy_kwh" async def async_setup_platform(hass, config, add_entities, discovery_info=None): """Set up the platform. Deprecated. """ _LOGGER.warning( "Loading as a platform is no longer supported, " "convert to use the tplink component." ) async def async_setup_entry(hass: HomeAssistantType, config_entry, async_add_entities): """Set up switches.""" await async_add_entities_retry( hass, async_add_entities, hass.data[TPLINK_DOMAIN][CONF_LIGHT], add_entity ) return True def add_entity(device: SmartBulb, async_add_entities): """Check if device is online and add the entity.""" # Attempt to get the sysinfo. If it fails, it will raise an # exception that is caught by async_add_entities_retry which # will try again later. device.get_sysinfo() async_add_entities([TPLinkSmartBulb(device)], update_before_add=True) def brightness_to_percentage(byt): """Convert brightness from absolute 0..255 to percentage.""" return int((byt * 100.0) / 255.0) def brightness_from_percentage(percent): """Convert percentage to absolute value 0..255.""" return (percent * 255.0) / 100.0 LightState = NamedTuple( "LightState", ( ("state", bool), ("brightness", int), ("color_temp", float), ("hs", Tuple[int, int]), ("emeter_params", dict), ), ) LightFeatures = NamedTuple( "LightFeatures", ( ("sysinfo", Dict[str, Any]), ("mac", str), ("alias", str), ("model", str), ("supported_features", int), ("min_mireds", float), ("max_mireds", float), ), ) class TPLinkSmartBulb(Light): """Representation of a TPLink Smart Bulb.""" def __init__(self, smartbulb: SmartBulb) -> None: """Initialize the bulb.""" self.smartbulb = smartbulb self._light_features = cast(LightFeatures, None) self._light_state = cast(LightState, None) self._is_available = True self._is_setting_light_state = False @property def unique_id(self): """Return a unique ID.""" return self._light_features.mac @property def name(self): """Return the name of the Smart Bulb.""" return self._light_features.alias @property def device_info(self): """Return information about the device.""" return { "name": self._light_features.alias, "model": self._light_features.model, "manufacturer": "TP-Link", "connections": {(dr.CONNECTION_NETWORK_MAC, self._light_features.mac)}, "sw_version": self._light_features.sysinfo["sw_ver"], } @property def available(self) -> bool: """Return if bulb is available.""" return self._is_available @property def device_state_attributes(self): """Return the state attributes of the device.""" return self._light_state.emeter_params async def async_turn_on(self, **kwargs): """Turn the light on.""" brightness = ( int(kwargs[ATTR_BRIGHTNESS]) if ATTR_BRIGHTNESS in kwargs else self._light_state.brightness if self._light_state.brightness is not None else 255 ) color_tmp = ( int(kwargs[ATTR_COLOR_TEMP]) if ATTR_COLOR_TEMP in kwargs else self._light_state.color_temp ) await self.async_set_light_state_retry( self._light_state, LightState( state=True, brightness=brightness, color_temp=color_tmp, hs=tuple(kwargs.get(ATTR_HS_COLOR, self._light_state.hs or ())), emeter_params=self._light_state.emeter_params, ), ) async def async_turn_off(self, **kwargs): """Turn the light off.""" await self.async_set_light_state_retry( self._light_state, LightState( state=False, brightness=self._light_state.brightness, color_temp=self._light_state.color_temp, hs=self._light_state.hs, emeter_params=self._light_state.emeter_params, ), ) @property def min_mireds(self): """Return minimum supported color temperature.""" return self._light_features.min_mireds @property def max_mireds(self): """Return maximum supported color temperature.""" return self._light_features.max_mireds @property def color_temp(self): """Return the color temperature of this light in mireds for HA.""" return self._light_state.color_temp @property def brightness(self): """Return the brightness of this light between 0..255.""" return self._light_state.brightness @property def hs_color(self): """Return the color.""" return self._light_state.hs @property def is_on(self): """Return True if device is on.""" return self._light_state.state def update(self): """Update the TP-Link Bulb's state.""" # State is currently being set, ignore. if self._is_setting_light_state: return # Initial run, perform call blocking. if not self._light_features: self.do_update_retry(False) # Subsequent runs should not block. else: self.hass.add_job(self.do_update_retry, True) def do_update_retry(self, update_state: bool) -> None: """Update state data with retry.""" "" try: # Update light features only once. self._light_features = ( self._light_features or self.get_light_features_retry() ) self._light_state = self.get_light_state_retry(self._light_features) self._is_available = True except (SmartDeviceException, OSError) as ex: if self._is_available: _LOGGER.warning( "Could not read data for %s: %s", self.smartbulb.host, ex ) self._is_available = False # The local variables were updates asyncronousally, # we need the entity registry to poll this object's properties for # updated information. Calling schedule_update_ha_state will only # cause a loop. if update_state: self.schedule_update_ha_state() @property def supported_features(self): """Flag supported features.""" return self._light_features.supported_features def get_light_features_retry(self) -> LightFeatures: """Retry the retrieval of the supported features.""" try: return self.get_light_features() except (SmartDeviceException, OSError): pass _LOGGER.debug("Retrying getting light features") return self.get_light_features() def get_light_features(self): """Determine all supported features in one go.""" sysinfo = self.smartbulb.sys_info supported_features = 0 mac = self.smartbulb.mac alias = self.smartbulb.alias model = self.smartbulb.model min_mireds = None max_mireds = None if self.smartbulb.is_dimmable: supported_features += SUPPORT_BRIGHTNESS if getattr(self.smartbulb, "is_variable_color_temp", False): supported_features += SUPPORT_COLOR_TEMP min_mireds = kelvin_to_mired(self.smartbulb.valid_temperature_range[1]) max_mireds = kelvin_to_mired(self.smartbulb.valid_temperature_range[0]) if getattr(self.smartbulb, "is_color", False): supported_features += SUPPORT_COLOR return LightFeatures( sysinfo=sysinfo, mac=mac, alias=alias, model=model, supported_features=supported_features, min_mireds=min_mireds, max_mireds=max_mireds, ) def get_light_state_retry(self, light_features: LightFeatures) -> LightState: """Retry the retrieval of getting light states.""" try: return self.get_light_state(light_features) except (SmartDeviceException, OSError): pass _LOGGER.debug("Retrying getting light state") return self.get_light_state(light_features) def get_light_state(self, light_features: LightFeatures) -> LightState: """Get the light state.""" emeter_params = {} brightness = None color_temp = None hue_saturation = None state = self.smartbulb.state == SmartBulb.BULB_STATE_ON if light_features.supported_features & SUPPORT_BRIGHTNESS: brightness = brightness_from_percentage(self.smartbulb.brightness) if light_features.supported_features & SUPPORT_COLOR_TEMP: if self.smartbulb.color_temp is not None and self.smartbulb.color_temp != 0: color_temp = kelvin_to_mired(self.smartbulb.color_temp) if light_features.supported_features & SUPPORT_COLOR: hue, sat, _ = self.smartbulb.hsv hue_saturation = (hue, sat) if self.smartbulb.has_emeter: emeter_params[ATTR_CURRENT_POWER_W] = "{:.1f}".format( self.smartbulb.current_consumption() ) daily_statistics = self.smartbulb.get_emeter_daily() monthly_statistics = self.smartbulb.get_emeter_monthly() try: emeter_params[ATTR_DAILY_ENERGY_KWH] = "{:.3f}".format( daily_statistics[int(time.strftime("%d"))] ) emeter_params[ATTR_MONTHLY_ENERGY_KWH] = "{:.3f}".format( monthly_statistics[int(time.strftime("%m"))] ) except KeyError: # device returned no daily/monthly history pass return LightState( state=state, brightness=brightness, color_temp=color_temp, hs=hue_saturation, emeter_params=emeter_params, ) async def async_set_light_state_retry( self, old_light_state: LightState, new_light_state: LightState ) -> None: """Set the light state with retry.""" # Optimistically setting the light state. self._light_state = new_light_state # Tell the device to set the states. self._is_setting_light_state = True try: await self.hass.async_add_executor_job( self.set_light_state, old_light_state, new_light_state ) self._is_available = True self._is_setting_light_state = False return except (SmartDeviceException, OSError): pass try: _LOGGER.debug("Retrying setting light state") await self.hass.async_add_executor_job( self.set_light_state, old_light_state, new_light_state ) self._is_available = True except (SmartDeviceException, OSError) as ex: self._is_available = False _LOGGER.warning("Could not set data for %s: %s", self.smartbulb.host, ex) self._is_setting_light_state = False def set_light_state( self, old_light_state: LightState, new_light_state: LightState ) -> None: """Set the light state.""" # Calling the API with the new state information. if new_light_state.state != old_light_state.state: if new_light_state.state: self.smartbulb.state = SmartBulb.BULB_STATE_ON else: self.smartbulb.state = SmartBulb.BULB_STATE_OFF return if new_light_state.color_temp != old_light_state.color_temp: self.smartbulb.color_temp = mired_to_kelvin(new_light_state.color_temp) brightness_pct = brightness_to_percentage(new_light_state.brightness) if new_light_state.hs != old_light_state.hs and len(new_light_state.hs) > 1: hue, sat = new_light_state.hs hsv = (int(hue), int(sat), brightness_pct) self.smartbulb.hsv = hsv elif new_light_state.brightness != old_light_state.brightness: self.smartbulb.brightness = brightness_pct