From bb771d802d2919f068aed13a7ae80544d4a09c52 Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Sun, 16 Mar 2014 15:00:59 -0700 Subject: [PATCH] Light color/brightness now exposed and controllable --- .../components/device_sun_light_trigger.py | 11 +- homeassistant/components/light.py | 306 +++++++++++++----- homeassistant/util.py | 54 +++- 3 files changed, 285 insertions(+), 86 deletions(-) diff --git a/homeassistant/components/device_sun_light_trigger.py b/homeassistant/components/device_sun_light_trigger.py index 6b949851207..961f63f5968 100644 --- a/homeassistant/components/device_sun_light_trigger.py +++ b/homeassistant/components/device_sun_light_trigger.py @@ -15,6 +15,8 @@ from . import light, sun, device_tracker, group LIGHT_TRANSITION_TIME = timedelta(minutes=15) +LIGHT_BRIGHTNESS = 164 +LIGHT_XY_COLOR = [0.5119, 0.4147] # pylint: disable=too-many-branches @@ -64,7 +66,10 @@ def setup(bus, statemachine, light_group=None): if (device_tracker.is_on(statemachine) and not light.is_on(statemachine, light_id)): - light.turn_on(bus, light_id, LIGHT_TRANSITION_TIME.seconds) + light.turn_on(bus, light_id, + transition=LIGHT_TRANSITION_TIME.seconds, + brightness=LIGHT_BRIGHTNESS, + xy_color=LIGHT_XY_COLOR) def turn_on(light_id): """ Lambda can keep track of function parameters but not local @@ -115,7 +120,9 @@ def setup(bus, statemachine, light_group=None): # Turn on lights directly instead of calling group.turn_on # So we skip fetching the entity ids again. for light_id in light_ids: - light.turn_on(bus, light_id) + light.turn_on(bus, light_id, + brightness=LIGHT_BRIGHTNESS, + xy_color=LIGHT_XY_COLOR) # Are we in the time span were we would turn on the lights # if someone would be home? diff --git a/homeassistant/components/light.py b/homeassistant/components/light.py index 2f968794a2f..e1fcb9d9585 100644 --- a/homeassistant/components/light.py +++ b/homeassistant/components/light.py @@ -8,6 +8,7 @@ Provides functionality to interact with lights. import logging import socket from datetime import datetime, timedelta +from collections import namedtuple import homeassistant as ha import homeassistant.util as util @@ -26,6 +27,16 @@ ENTITY_ID_FORMAT = DOMAIN + ".{}" MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10) +# integer that represents transition time in seconds to make change +ATTR_TRANSITION = "transition" + +# lists holding color values +ATTR_RGB_COLOR = "rgb_color" +ATTR_XY_COLOR = "xy_color" + +# int with value 0 .. 255 representing brightness of the light +ATTR_BRIGHTNESS = "brightness" + def is_on(statemachine, entity_id=None): """ Returns if the lights are on based on the statemachine. """ @@ -34,32 +45,44 @@ def is_on(statemachine, entity_id=None): return statemachine.is_state(entity_id, STATE_ON) -def turn_on(bus, entity_id=None, transition_seconds=None): +# pylint: disable=too-many-arguments +def turn_on(bus, entity_id=None, transition=None, brightness=None, + rgb_color=None, xy_color=None): """ Turns all or specified light on. """ data = {} if entity_id: data[ATTR_ENTITY_ID] = entity_id - if transition_seconds: - data["transition_seconds"] = transition_seconds + if transition is not None: + data[ATTR_TRANSITION] = transition + + if brightness is not None: + data[ATTR_BRIGHTNESS] = brightness + + if rgb_color is not None: + data[ATTR_RGB_COLOR] = rgb_color + + if xy_color is not None: + data[ATTR_XY_COLOR] = xy_color bus.call_service(DOMAIN, SERVICE_TURN_ON, data) -def turn_off(bus, entity_id=None, transition_seconds=None): +def turn_off(bus, entity_id=None, transition=None): """ Turns all or specified light off. """ data = {} if entity_id: data[ATTR_ENTITY_ID] = entity_id - if transition_seconds: - data["transition_seconds"] = transition_seconds + if transition is not None: + data[ATTR_TRANSITION] = transition bus.call_service(DOMAIN, SERVICE_TURN_OFF, data) +# pylint: disable=too-many-branches def setup(bus, statemachine, light_control): """ Exposes light control via statemachine and services. """ @@ -68,47 +91,68 @@ def setup(bus, statemachine, light_control): ent_to_light = {} light_to_ent = {} - def update_light_state(time): # pylint: disable=unused-argument - """ Track the state of the lights. """ + def _update_light_state(light_id, light_state): + """ Update statemachine based on the LightState passed in. """ try: - should_update = datetime.now() - update_light_state.last_updated \ - > MIN_TIME_BETWEEN_SCANS + entity_id = light_to_ent[light_id] + except KeyError: + # We have not seen this light before, set it up - except AttributeError: # if last_updated does not exist - should_update = True + # Get name and create entity id + name = light_control.get_name(light_id) or "Unknown Light" + + logger.info(u"Found new light {}".format(name)) + + entity_id = ENTITY_ID_FORMAT.format(util.slugify(name)) + + # Ensure unique entity id + tries = 1 + while entity_id in ent_to_light: + tries += 1 + + entity_id = ENTITY_ID_FORMAT.format( + util.slugify("{} {}".format(name, tries))) + + ent_to_light[entity_id] = light_id + light_to_ent[light_id] = entity_id + + state_attr = {} + + if light_state.on: + state = STATE_ON + + if light_state.brightness: + state_attr[ATTR_BRIGHTNESS] = light_state.brightness + + if light_state.color: + state_attr[ATTR_XY_COLOR] = light_state.color + + else: + state = STATE_OFF + + statemachine.set_state(entity_id, state, state_attr) + + def update_light_state(light_id): + """ Update the state of specified light. """ + _update_light_state(light_id, light_control.get_state(light_id)) + + # pylint: disable=unused-argument + def update_lights_state(time, force_reload=False): + """ Update the state of all the lights. """ + + # First time this method gets called, force_reload should be True + if (force_reload or + datetime.now() - update_lights_state.last_updated > + MIN_TIME_BETWEEN_SCANS): - if should_update: logger.info("Updating light status") - update_light_state.last_updated = datetime.now() - names = None + update_lights_state.last_updated = datetime.now() - states = light_control.get_states() - - for light_id, is_light_on in states.items(): - try: - entity_id = light_to_ent[light_id] - except KeyError: - # We have not seen this light before, set it up - - # Load light names if not loaded this update call - if names is None: - names = light_control.get_names() - - name = names.get( - light_id, "Unknown Light {}".format(len(ent_to_light))) - - logger.info("Found new light {}".format(name)) - - entity_id = ENTITY_ID_FORMAT.format(util.slugify(name)) - - ent_to_light[entity_id] = light_id - light_to_ent[light_id] = entity_id - - statemachine.set_state(entity_id, - STATE_ON if is_light_on else STATE_OFF) + for light_id, light_state in light_control.get_states().items(): + _update_light_state(light_id, light_state) # Update light state and discover lights for tracking the group - update_light_state(None) + update_lights_state(None, True) # Track all lights in a group group.setup(bus, statemachine, @@ -116,20 +160,65 @@ def setup(bus, statemachine, light_control): def handle_light_service(service): """ Hande a turn light on or off service call. """ - entity_id = service.data.get(ATTR_ENTITY_ID, None) - transition_seconds = service.data.get("transition_seconds", None) + # Get and validate data + dat = service.data - if service.service == SERVICE_TURN_ON: - light_control.turn_light_on(ent_to_light.get(entity_id), - transition_seconds) + if ATTR_ENTITY_ID in dat: + light_id = ent_to_light.get(dat[ATTR_ENTITY_ID]) else: - light_control.turn_light_off(ent_to_light.get(entity_id), - transition_seconds) + light_id = None - update_light_state(None) + transition = util.dict_get_convert(dat, ATTR_TRANSITION, int, None) + + if service.service == SERVICE_TURN_OFF: + light_control.turn_light_off(light_id, transition) + + else: + # Processing extra data for turn light on request + bright = util.dict_get_convert(dat, ATTR_BRIGHTNESS, int, 164) + + color = None + xy_color = dat.get(ATTR_XY_COLOR) + rgb_color = dat.get(ATTR_RGB_COLOR) + + if xy_color: + try: + # xy_color should be a list containing 2 floats + xy_color = [float(val) for val in xy_color] + + if len(xy_color) == 2: + color = xy_color + + except (TypeError, ValueError): + # TypeError if xy_color was not iterable + # ValueError if value could not be converted to float + pass + + if not color and rgb_color: + try: + # rgb_color should be a list containing 3 ints + rgb_color = [int(val) for val in rgb_color] + + if len(rgb_color) == 3: + color = util.color_RGB_to_xy(rgb_color[0], + rgb_color[1], + rgb_color[2]) + + except (TypeError, ValueError): + # TypeError if color has no len + # ValueError if not all values convertable to int + color = None + + light_control.turn_light_on(light_id, transition, bright, color) + + # Update state of lights touched + if light_id: + update_light_state(light_id) + else: + update_lights_state(None, True) # Update light state every 30 seconds - ha.track_time_change(bus, update_light_state, second=[0, 30]) + ha.track_time_change(bus, update_lights_state, second=[0, 30]) # Listen for light on and light off service calls bus.register_service(DOMAIN, SERVICE_TURN_ON, @@ -141,6 +230,19 @@ def setup(bus, statemachine, light_control): return True +LightState = namedtuple("LightState", ['on', 'brightness', 'color']) + + +def _hue_to_light_state(info): + """ Helper method to convert a Hue state to a LightState. """ + try: + return LightState(info['state']['reachable'] and info['state']['on'], + info['state']['bri'], info['state']['xy']) + except KeyError: + # KeyError if one of the keys didn't exist + return None + + class HueLightControl(object): """ Class to interface with the Hue light system. """ @@ -168,59 +270,101 @@ class HueLightControl(object): return - if len(self._bridge.get_light()) == 0: + # Dict mapping light_id to name + self._lights = {} + self._update_lights() + + if len(self._lights) == 0: logger.error("HueLightControl:Could not find any lights. ") self.success_init = False else: self.success_init = True - def get_names(self): - """ Return a dict with id mapped to name. """ + def _update_lights(self): + """ Helper method to update the known names from Hue. """ try: - return {int(item[0]): item[1]['name'] for item - in self._bridge.get_light().items()} + self._lights = {int(item[0]): item[1]['name'] for item + in self._bridge.get_light().items()} except (socket.error, KeyError): # socket.error because sometimes we cannot reach Hue # KeyError if we got unexpected data - return {} + # We don't do anything, keep old values + pass + + def get_name(self, light_id): + """ Return name for specified light_id or None if no name known. """ + if not light_id in self._lights: + self._update_lights() + + return self._lights.get(light_id) + + def get_state(self, light_id): + """ Return a LightState representing light light_id. """ + try: + info = self._bridge.get_light(light_id) + + return _hue_to_light_state(info) + + except socket.error: + # socket.error when we cannot reach Hue + return None def get_states(self): - """ Return a dict with id mapped to boolean is_on. """ + """ Return a dict with id mapped to LightState objects. """ + states = {} try: - # Light is on if reachable and on - return {int(itm[0]): - itm[1]['state']['reachable'] and itm[1]['state']['on'] - for itm in self._bridge.get_api()['lights'].items()} + api = self._bridge.get_api() - except (socket.error, KeyError): - # socket.error because sometimes we cannot reach Hue - # KeyError if we got unexpected data - return {} + except socket.error: + # socket.error when we cannot reach Hue + return states - def turn_light_on(self, light_id=None, transition_seconds=None): + api_states = api.get('lights') + + if not isinstance(api_states, dict): + return states + + for light_id, info in api_states.items(): + state = _hue_to_light_state(info) + + if state: + states[int(light_id)] = state + + return states + + def turn_light_on(self, light_id, transition, brightness, xy_color): """ Turn the specified or all lights on. """ - self._turn_light(True, light_id, transition_seconds) - - def turn_light_off(self, light_id=None, transition_seconds=None): - """ Turn the specified or all lights off. """ - self._turn_light(False, light_id, transition_seconds) - - def _turn_light(self, turn, light_id, transition_seconds): - """ Helper method to turn lights on or off. """ - if turn: - command = {'on': True, 'xy': [0.5119, 0.4147], 'bri': 164} - else: - command = {'on': False} - if light_id is None: - light_id = [light.light_id for light in self._bridge.lights] + light_id = self._lights.keys() - if transition_seconds is not None: + command = {'on': True} + + if transition is not None: # Transition time is in 1/10th seconds and cannot exceed # 900 seconds. - command['transitiontime'] = min(9000, transition_seconds * 10) + command['transitiontime'] = min(9000, transition * 10) + + if brightness is not None: + command['bri'] = brightness + + if xy_color: + command['xy'] = xy_color + + self._bridge.set_light(light_id, command) + + def turn_light_off(self, light_id, transition): + """ Turn the specified or all lights off. """ + if light_id is None: + light_id = self._lights.keys() + + command = {'on': False} + + if transition is not None: + # Transition time is in 1/10th seconds and cannot exceed + # 900 seconds. + command['transitiontime'] = min(9000, transition * 10) self._bridge.set_light(light_id, command) diff --git a/homeassistant/util.py b/homeassistant/util.py index 12c0f8ed67a..0d1007ac10c 100644 --- a/homeassistant/util.py +++ b/homeassistant/util.py @@ -3,7 +3,6 @@ import threading import Queue import datetime import re -import os RE_SANITIZE_FILENAME = re.compile(r'(~|\.\.|/|\\)') RE_SLUGIFY = re.compile(r'[^A-Za-z0-9_]+') @@ -63,14 +62,63 @@ def repr_helper(inp): return u", ".join( repr_helper(key)+u"="+repr_helper(item) for key, item in inp.items()) - elif isinstance(inp, list): - return u'[' + u', '.join(inp) + u']' elif isinstance(inp, datetime.datetime): return datetime_to_str(inp) else: return unicode(inp) +# Taken from: http://www.cse.unr.edu/~quiroz/inc/colortransforms.py +# License: Code is given as is. Use at your own risk and discretion. +# pylint: disable=invalid-name +def color_RGB_to_xy(R, G, B): + ''' Convert from RGB color to XY color. ''' + var_R = (R / 255.) + var_G = (G / 255.) + var_B = (B / 255.) + + if var_R > 0.04045: + var_R = ((var_R + 0.055) / 1.055) ** 2.4 + else: + var_R /= 12.92 + + if var_G > 0.04045: + var_G = ((var_G + 0.055) / 1.055) ** 2.4 + else: + var_G /= 12.92 + + if var_B > 0.04045: + var_B = ((var_B + 0.055) / 1.055) ** 2.4 + else: + var_B /= 12.92 + + var_R *= 100 + var_G *= 100 + var_B *= 100 + + # Observer. = 2 deg, Illuminant = D65 + X = var_R * 0.4124 + var_G * 0.3576 + var_B * 0.1805 + Y = var_R * 0.2126 + var_G * 0.7152 + var_B * 0.0722 + Z = var_R * 0.0193 + var_G * 0.1192 + var_B * 0.9505 + + # Convert XYZ to xy, see CIE 1931 color space on wikipedia + return X / (X + Y + Z), Y / (X + Y + Z) + + +def dict_get_convert(dic, key, value_type, default=None): + """ Get a value from a dic and ensure it is value_type. """ + return convert(dic[key], value_type, default) if key in dic else default + + +def convert(value, to_type, default=None): + """ Converts value to to_type, returns default if fails. """ + try: + return to_type(value) + except ValueError: + # If value could not be converted + return default + + # Reason why I decided to roll my own ThreadPool instead of using # multiprocessing.dummy.pool or even better, use multiprocessing.pool and # not be hurt by the GIL in the cpython interpreter: