light/hyperion: use RGB, clean code

This commit is contained in:
MakeMeASandwich 2015-10-20 17:30:23 +02:00
parent c5a7e3abd1
commit 2e9ee28637

View File

@ -20,10 +20,7 @@ import socket
import json import json
from homeassistant.const import CONF_HOST from homeassistant.const import CONF_HOST
from homeassistant.components.light import (Light, ATTR_XY_COLOR, from homeassistant.components.light import (Light, ATTR_RGB_COLOR)
ATTR_BRIGHTNESS)
from homeassistant.util.color import (color_RGB_to_xy,
color_xy_brightness_to_RGB)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
REQUIREMENTS = [] REQUIREMENTS = []
@ -33,7 +30,11 @@ def setup_platform(hass, config, add_devices_callback, discovery_info=None):
""" Sets up a Hyperion server remote """ """ Sets up a Hyperion server remote """
host = config.get(CONF_HOST, None) host = config.get(CONF_HOST, None)
port = config.get("port", 19444) port = config.get("port", 19444)
add_devices_callback([Hyperion(host, port)]) device = Hyperion(host, port)
if device.setup():
add_devices_callback([device])
else:
return False
class Hyperion(Light): class Hyperion(Light):
@ -42,76 +43,74 @@ class Hyperion(Light):
def __init__(self, host, port): def __init__(self, host, port):
self._host = host self._host = host
self._port = port self._port = port
self._name = "unknown" self._name = host
self._is_available = False self._is_available = True
self._xy_color = color_RGB_to_xy(0, 0, 0) self._rgb_color = [255, 255, 255]
self._brightness = 255
@property @property
def name(self): def name(self):
""" Get the hostname of the server. """ """ Return the hostname of the server. """
return self._name return self._name
@property @property
def color_xy(self): def rgb_color(self):
""" XY color value. """ """ Last RGB color value set. """
return self._xy_color return self._rgb_color
@property
def brightness(self):
""" Brightness. """
return self._brightness
@property @property
def is_on(self): def is_on(self):
""" True if device is on. """ """ True if the device is online. """
self.check_remote()
return self._is_available return self._is_available
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
""" Turn the lights on. """ """ Turn the lights on. """
if self._is_available: if self._is_available:
if ATTR_XY_COLOR in kwargs: if ATTR_RGB_COLOR in kwargs:
self._xy_color = kwargs[ATTR_XY_COLOR] self._rgb_color = kwargs[ATTR_RGB_COLOR]
if ATTR_BRIGHTNESS in kwargs:
self._brightness = kwargs[ATTR_BRIGHTNESS] self.json_request({"command": "color", "priority": 128,
self.update_remote() "color": self._rgb_color})
def turn_off(self, **kwargs): def turn_off(self, **kwargs):
""" Disconnect the remote. """ """ Disconnect the remote. """
self.json_request({"command": "clearall"}) self.json_request({"command": "clearall"})
def check_remote(self): def update(self):
""" Ping the remote and gets the hostname. """ """ Ping the remote. """
# just see if the remote port is open
self._is_available = self.json_request()
def setup(self):
""" Get the hostname of the remote. """
response = self.json_request({"command": "serverinfo"}) response = self.json_request({"command": "serverinfo"})
if response: if response:
self._name = response["info"]["hostname"] self._name = response["info"]["hostname"]
return True
def update_remote(self): return False
""" Set the remote's lights. """
rgb = color_xy_brightness_to_RGB(self._xy_color[0], self._xy_color[1],
self._brightness)
rgb = [int(c) for c in rgb]
self.json_request(
{"command": "color", "priority": 128, "color": rgb})
def json_request(self, request, wait_for_response=False): def json_request(self, request=None, wait_for_response=False):
""" Communicate with the json server. """ """ Communicate with the json server. """
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try: try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((self._host, self._port)) sock.connect((self._host, self._port))
except OSError: except OSError:
self._is_available = False return False
return None
if not request:
# no communication needed, simple presence detection returns True
return True
sock.send(bytearray(json.dumps(request) + "\n", "utf-8")) sock.send(bytearray(json.dumps(request) + "\n", "utf-8"))
try: try:
buf = sock.recv(4096) buf = sock.recv(4096)
except socket.timeout: except socket.timeout:
return None # something is wrong, assume it's offline
return False
# read until a newline or timeout
buffering = True buffering = True
while buffering: while buffering:
if "\n" in str(buf, "utf-8"): if "\n" in str(buf, "utf-8"):
@ -129,7 +128,4 @@ class Hyperion(Light):
buf += more buf += more
sock.close() sock.close()
return json.loads(response)
j = json.loads(response)
self._is_available = True
return j