mirror of
https://github.com/home-assistant/core.git
synced 2025-07-25 14:17:45 +00:00
add cec platform
This commit is contained in:
parent
cb6f50b7ff
commit
d87e969671
@ -94,6 +94,7 @@ omit =
|
|||||||
homeassistant/components/camera/generic.py
|
homeassistant/components/camera/generic.py
|
||||||
homeassistant/components/camera/mjpeg.py
|
homeassistant/components/camera/mjpeg.py
|
||||||
homeassistant/components/camera/rpi_camera.py
|
homeassistant/components/camera/rpi_camera.py
|
||||||
|
homeassistant/components/cec.py
|
||||||
homeassistant/components/device_tracker/actiontec.py
|
homeassistant/components/device_tracker/actiontec.py
|
||||||
homeassistant/components/device_tracker/aruba.py
|
homeassistant/components/device_tracker/aruba.py
|
||||||
homeassistant/components/device_tracker/asuswrt.py
|
homeassistant/components/device_tracker/asuswrt.py
|
||||||
|
120
homeassistant/components/cec.py
Normal file
120
homeassistant/components/cec.py
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
"""
|
||||||
|
CEC platform.
|
||||||
|
|
||||||
|
Requires libcec + Python bindings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import voluptuous as vol
|
||||||
|
from homeassistant.const import EVENT_HOMEASSISTANT_START
|
||||||
|
import homeassistant.helpers.config_validation as cv
|
||||||
|
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
_CEC = None
|
||||||
|
DOMAIN = 'cec'
|
||||||
|
SERVICE_SELECT_DEVICE = 'select_device'
|
||||||
|
SERVICE_POWER_ON = 'power_on'
|
||||||
|
SERVICE_STANDBY = 'standby'
|
||||||
|
CONF_DEVICES = 'devices'
|
||||||
|
ATTR_DEVICE = 'device'
|
||||||
|
MAX_DEPTH = 4
|
||||||
|
|
||||||
|
|
||||||
|
# pylint: disable=unnecessary-lambda
|
||||||
|
DEVICE_SCHEMA = vol.Schema({
|
||||||
|
vol.All(cv.positive_int): vol.Any(lambda devices: DEVICE_SCHEMA(devices),
|
||||||
|
cv.string)
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
PLATFORM_SCHEMA = vol.Schema({
|
||||||
|
vol.Required(CONF_DEVICES): DEVICE_SCHEMA
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def parse_mapping(mapping, parents=None):
|
||||||
|
"""Parse configuration device mapping."""
|
||||||
|
if parents is None:
|
||||||
|
parents = []
|
||||||
|
for addr, val in mapping.items():
|
||||||
|
cur = parents + [str(addr)]
|
||||||
|
if isinstance(val, dict):
|
||||||
|
yield from parse_mapping(val, cur)
|
||||||
|
elif isinstance(val, str):
|
||||||
|
yield (val, cur)
|
||||||
|
|
||||||
|
|
||||||
|
def pad_physical_address(addr):
|
||||||
|
"""Right-pad a physical address"""
|
||||||
|
return addr + ['0'] * (MAX_DEPTH - len(addr))
|
||||||
|
|
||||||
|
|
||||||
|
def setup(hass, config):
|
||||||
|
"""Setup CEC capability."""
|
||||||
|
global _CEC
|
||||||
|
|
||||||
|
# cec is only available if libcec is properly installed
|
||||||
|
# and the Python bindings are accessible.
|
||||||
|
try:
|
||||||
|
import cec
|
||||||
|
except ImportError:
|
||||||
|
_LOGGER.error("libcec must be installed")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Parse configuration into a dict of device name
|
||||||
|
# to physical address represented as a list of
|
||||||
|
# four elements.
|
||||||
|
flat = {}
|
||||||
|
for pair in parse_mapping(config[DOMAIN][0].get(CONF_DEVICES, {})):
|
||||||
|
flat[pair[0]] = pad_physical_address(pair[1])
|
||||||
|
|
||||||
|
# Configure libcec.
|
||||||
|
cfg = cec.libcec_configuration()
|
||||||
|
cfg.strDeviceName = 'HASS'
|
||||||
|
cfg.bActivateSource = 0
|
||||||
|
cfg.bMonitorOnly = 1
|
||||||
|
cfg.clientVersion = cec.LIBCEC_VERSION_CURRENT
|
||||||
|
|
||||||
|
# Set up CEC adapter.
|
||||||
|
_CEC = cec.ICECAdapter.Create(cfg)
|
||||||
|
|
||||||
|
def _power_on(call):
|
||||||
|
"""Power on all devices."""
|
||||||
|
_CEC.PowerOnDevices()
|
||||||
|
|
||||||
|
def _standby(call):
|
||||||
|
"""Standby all devices."""
|
||||||
|
_CEC.StandbyDevices()
|
||||||
|
|
||||||
|
def _select_device(call):
|
||||||
|
"""Select the active device."""
|
||||||
|
path = flat.get(call.data[ATTR_DEVICE])
|
||||||
|
if not path:
|
||||||
|
_LOGGER.error("Device not found: %s", call.data[ATTR_DEVICE])
|
||||||
|
cmds = []
|
||||||
|
for i in range(1, MAX_DEPTH - 1):
|
||||||
|
addr = pad_physical_address(path[:i])
|
||||||
|
cmds.append('1f:82:{}{}:{}{}'.format(*addr))
|
||||||
|
cmds.append('1f:86:{}{}:{}{}'.format(*addr))
|
||||||
|
for cmd in cmds:
|
||||||
|
_CEC.Transmit(_CEC.CommandFromString(cmd))
|
||||||
|
_LOGGER.info("Selected %s", call.data[ATTR_DEVICE])
|
||||||
|
|
||||||
|
def _start_cec(event):
|
||||||
|
"""Open CEC adapter."""
|
||||||
|
adapters = _CEC.DetectAdapters()
|
||||||
|
if len(adapters) == 0:
|
||||||
|
_LOGGER.error("No CEC adapter found")
|
||||||
|
return
|
||||||
|
|
||||||
|
if _CEC.Open(adapters[0].strComName):
|
||||||
|
hass.services.register(DOMAIN, SERVICE_POWER_ON, _power_on)
|
||||||
|
hass.services.register(DOMAIN, SERVICE_STANDBY, _standby)
|
||||||
|
hass.services.register(DOMAIN, SERVICE_SELECT_DEVICE,
|
||||||
|
_select_device)
|
||||||
|
else:
|
||||||
|
_LOGGER.error("Failed to open adapter")
|
||||||
|
|
||||||
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_cec)
|
||||||
|
return True
|
Loading…
x
Reference in New Issue
Block a user