diff --git a/CODEOWNERS b/CODEOWNERS index 51ce87e702f..43959f67c30 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -267,6 +267,7 @@ homeassistant/components/nsw_fuel_station/* @nickw444 homeassistant/components/nsw_rural_fire_service_feed/* @exxamalte homeassistant/components/nuheat/* @bdraco homeassistant/components/nuki/* @pvizeli +homeassistant/components/numato/* @clssn homeassistant/components/nut/* @bdraco homeassistant/components/nws/* @MatthewFlamm homeassistant/components/nzbget/* @chriscla diff --git a/homeassistant/components/numato/__init__.py b/homeassistant/components/numato/__init__.py new file mode 100644 index 00000000000..e5eeaa31846 --- /dev/null +++ b/homeassistant/components/numato/__init__.py @@ -0,0 +1,248 @@ +"""Support for controlling GPIO pins of a Numato Labs USB GPIO expander.""" +import logging + +import numato_gpio as gpio +import voluptuous as vol + +from homeassistant.const import ( + CONF_BINARY_SENSORS, + CONF_ID, + CONF_NAME, + CONF_SENSORS, + CONF_SWITCHES, + EVENT_HOMEASSISTANT_START, + EVENT_HOMEASSISTANT_STOP, +) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.discovery import load_platform + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "numato" + +CONF_INVERT_LOGIC = "invert_logic" +CONF_DISCOVER = "discover" +CONF_DEVICES = "devices" +CONF_DEVICE_ID = "id" +CONF_PORTS = "ports" +CONF_SRC_RANGE = "source_range" +CONF_DST_RANGE = "destination_range" +CONF_DST_UNIT = "unit" +DEFAULT_INVERT_LOGIC = False +DEFAULT_SRC_RANGE = [0, 1024] +DEFAULT_DST_RANGE = [0.0, 100.0] +DEFAULT_UNIT = "%" +DEFAULT_DEV = [f"/dev/ttyACM{i}" for i in range(10)] + +PORT_RANGE = range(1, 8) # ports 0-7 are ADC capable + +DATA_PORTS_IN_USE = "ports_in_use" +DATA_API = "api" + + +def int_range(rng): + """Validate the input array to describe a range by two integers.""" + if not (isinstance(rng[0], int) and isinstance(rng[1], int)): + raise vol.Invalid(f"Only integers are allowed: {rng}") + if len(rng) != 2: + raise vol.Invalid(f"Only two numbers allowed in a range: {rng}") + if rng[0] > rng[1]: + raise vol.Invalid(f"Lower range bound must come first: {rng}") + return rng + + +def float_range(rng): + """Validate the input array to describe a range by two floats.""" + try: + coe = vol.Coerce(float) + coe(rng[0]) + coe(rng[1]) + except vol.CoerceInvalid: + raise vol.Invalid(f"Only int or float values are allowed: {rng}") + if len(rng) != 2: + raise vol.Invalid(f"Only two numbers allowed in a range: {rng}") + if rng[0] > rng[1]: + raise vol.Invalid(f"Lower range bound must come first: {rng}") + return rng + + +def adc_port_number(num): + """Validate input number to be in the range of ADC enabled ports.""" + try: + num = int(num) + except (ValueError): + raise vol.Invalid(f"Port numbers must be integers: {num}") + if num not in range(1, 8): + raise vol.Invalid(f"Only port numbers from 1 to 7 are ADC capable: {num}") + return num + + +ADC_SCHEMA = vol.Schema( + { + vol.Required(CONF_NAME): cv.string, + vol.Optional(CONF_SRC_RANGE, default=DEFAULT_SRC_RANGE): int_range, + vol.Optional(CONF_DST_RANGE, default=DEFAULT_DST_RANGE): float_range, + vol.Optional(CONF_DST_UNIT, default=DEFAULT_UNIT): cv.string, + } +) + +PORTS_SCHEMA = vol.Schema({cv.positive_int: cv.string}) + +IO_PORTS_SCHEMA = vol.Schema( + { + vol.Required(CONF_PORTS): PORTS_SCHEMA, + vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean, + } +) + +DEVICE_SCHEMA = vol.Schema( + { + vol.Required(CONF_ID): cv.positive_int, + CONF_BINARY_SENSORS: IO_PORTS_SCHEMA, + CONF_SWITCHES: IO_PORTS_SCHEMA, + CONF_SENSORS: {CONF_PORTS: {adc_port_number: ADC_SCHEMA}}, + } +) + +CONFIG_SCHEMA = vol.Schema( + { + DOMAIN: { + CONF_DEVICES: vol.All(cv.ensure_list, [DEVICE_SCHEMA]), + vol.Optional(CONF_DISCOVER, default=DEFAULT_DEV): vol.All( + cv.ensure_list, [cv.string] + ), + }, + }, + extra=vol.ALLOW_EXTRA, +) + + +def setup(hass, config): + """Initialize the numato integration. + + Discovers available Numato devices and loads the binary_sensor, sensor and + switch platforms. + + Returns False on error during device discovery (e.g. duplicate ID), + otherwise returns True. + + No exceptions should occur, since the platforms are initialized on a best + effort basis, which means, errors are handled locally. + """ + hass.data[DOMAIN] = config[DOMAIN] + + try: + gpio.discover(config[DOMAIN][CONF_DISCOVER]) + except gpio.NumatoGpioError as err: + _LOGGER.info("Error discovering Numato devices: %s", err) + gpio.cleanup() + return False + + _LOGGER.info( + "Initializing Numato 32 port USB GPIO expanders with IDs: %s", + ", ".join(str(d) for d in gpio.devices), + ) + + hass.data[DOMAIN][DATA_API] = NumatoAPI() + + def cleanup_gpio(event): + """Stuff to do before stopping.""" + _LOGGER.debug("Clean up Numato GPIO") + gpio.cleanup() + if DATA_API in hass.data[DOMAIN]: + hass.data[DOMAIN][DATA_API].ports_registered.clear() + + def prepare_gpio(event): + """Stuff to do when home assistant starts.""" + _LOGGER.debug("Setup cleanup at stop for Numato GPIO") + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio) + + hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio) + + load_platform(hass, "binary_sensor", DOMAIN, {}, config) + load_platform(hass, "sensor", DOMAIN, {}, config) + load_platform(hass, "switch", DOMAIN, {}, config) + return True + + +# pylint: disable=no-self-use +class NumatoAPI: + """Home-Assistant specific API for numato device access.""" + + def __init__(self): + """Initialize API state.""" + self.ports_registered = dict() + + def check_port_free(self, device_id, port, direction): + """Check whether a port is still free set up. + + Fail with exception if it has already been registered. + """ + if (device_id, port) not in self.ports_registered: + self.ports_registered[(device_id, port)] = direction + else: + raise gpio.NumatoGpioError( + "Device {} port {} already in use as {}.".format( + device_id, + port, + "input" + if self.ports_registered[(device_id, port)] == gpio.IN + else "output", + ) + ) + + def check_device_id(self, device_id): + """Check whether a device has been discovered. + + Fail with exception. + """ + if device_id not in gpio.devices: + raise gpio.NumatoGpioError(f"Device {device_id} not available.") + + def check_port(self, device_id, port, direction): + """Raise an error if the port setup doesn't match the direction.""" + self.check_device_id(device_id) + if (device_id, port) not in self.ports_registered: + raise gpio.NumatoGpioError( + f"Port {port} is not set up for numato device {device_id}." + ) + msg = { + gpio.OUT: f"Trying to write to device {device_id} port {port} set up as input.", + gpio.IN: f"Trying to read from device {device_id} port {port} set up as output.", + } + if self.ports_registered[(device_id, port)] != direction: + raise gpio.NumatoGpioError(msg[direction]) + + def setup_output(self, device_id, port): + """Set up a GPIO as output.""" + self.check_device_id(device_id) + self.check_port_free(device_id, port, gpio.OUT) + gpio.devices[device_id].setup(port, gpio.OUT) + + def setup_input(self, device_id, port): + """Set up a GPIO as input.""" + self.check_device_id(device_id) + gpio.devices[device_id].setup(port, gpio.IN) + self.check_port_free(device_id, port, gpio.IN) + + def write_output(self, device_id, port, value): + """Write a value to a GPIO.""" + self.check_port(device_id, port, gpio.OUT) + gpio.devices[device_id].write(port, value) + + def read_input(self, device_id, port): + """Read a value from a GPIO.""" + self.check_port(device_id, port, gpio.IN) + return gpio.devices[device_id].read(port) + + def read_adc_input(self, device_id, port): + """Read an ADC value from a GPIO ADC port.""" + self.check_port(device_id, port, gpio.IN) + self.check_device_id(device_id) + return gpio.devices[device_id].adc_read(port) + + def edge_detect(self, device_id, port, event_callback): + """Add detection for RISING and FALLING events.""" + self.check_port(device_id, port, gpio.IN) + gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH) + gpio.devices[device_id].notify = True diff --git a/homeassistant/components/numato/binary_sensor.py b/homeassistant/components/numato/binary_sensor.py new file mode 100644 index 00000000000..ff61cb3cbb0 --- /dev/null +++ b/homeassistant/components/numato/binary_sensor.py @@ -0,0 +1,120 @@ +"""Binary sensor platform integration for Numato USB GPIO expanders.""" +from functools import partial +import logging + +from numato_gpio import NumatoGpioError + +from homeassistant.components.binary_sensor import BinarySensorDevice +from homeassistant.const import DEVICE_DEFAULT_NAME +from homeassistant.core import callback +from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send + +from . import ( + CONF_BINARY_SENSORS, + CONF_DEVICES, + CONF_ID, + CONF_INVERT_LOGIC, + CONF_PORTS, + DATA_API, + DOMAIN, +) + +_LOGGER = logging.getLogger(__name__) + +NUMATO_SIGNAL = "numato_signal_{}_{}" + + +def setup_platform(hass, config, add_entities, discovery_info=None): + """Set up the configured Numato USB GPIO binary sensor ports.""" + if discovery_info is None: + return + + def read_gpio(device_id, port, level): + """Send signal to entity to have it update state.""" + dispatcher_send(hass, NUMATO_SIGNAL.format(device_id, port), level) + + api = hass.data[DOMAIN][DATA_API] + binary_sensors = [] + devices = hass.data[DOMAIN][CONF_DEVICES] + for device in [d for d in devices if CONF_BINARY_SENSORS in d]: + device_id = device[CONF_ID] + platform = device[CONF_BINARY_SENSORS] + invert_logic = platform[CONF_INVERT_LOGIC] + ports = platform[CONF_PORTS] + for port, port_name in ports.items(): + try: + + api.setup_input(device_id, port) + api.edge_detect(device_id, port, partial(read_gpio, device_id)) + + except NumatoGpioError as err: + _LOGGER.error( + "Failed to initialize binary sensor '%s' on Numato device %s port %s: %s", + port_name, + device_id, + port, + err, + ) + continue + + binary_sensors.append( + NumatoGpioBinarySensor(port_name, device_id, port, invert_logic, api,) + ) + add_entities(binary_sensors, True) + + +class NumatoGpioBinarySensor(BinarySensorDevice): + """Represents a binary sensor (input) port of a Numato GPIO expander.""" + + def __init__(self, name, device_id, port, invert_logic, api): + """Initialize the Numato GPIO based binary sensor object.""" + self._name = name or DEVICE_DEFAULT_NAME + self._device_id = device_id + self._port = port + self._invert_logic = invert_logic + self._state = None + self._api = api + + async def async_added_to_hass(self): + """Connect state update callback.""" + self.async_on_remove( + async_dispatcher_connect( + self.hass, + NUMATO_SIGNAL.format(self._device_id, self._port), + self._async_update_state, + ) + ) + + @callback + def _async_update_state(self, level): + """Update entity state.""" + self._state = level + self.async_write_ha_state() + + @property + def should_poll(self): + """No polling needed.""" + return False + + @property + def name(self): + """Return the name of the sensor.""" + return self._name + + @property + def is_on(self): + """Return the state of the entity.""" + return self._state != self._invert_logic + + def update(self): + """Update the GPIO state.""" + try: + self._state = self._api.read_input(self._device_id, self._port) + except NumatoGpioError as err: + self._state = None + _LOGGER.error( + "Failed to update Numato device %s port %s: %s", + self._device_id, + self._port, + err, + ) diff --git a/homeassistant/components/numato/manifest.json b/homeassistant/components/numato/manifest.json new file mode 100644 index 00000000000..4e9857cd579 --- /dev/null +++ b/homeassistant/components/numato/manifest.json @@ -0,0 +1,8 @@ +{ + "domain": "numato", + "name": "Numato USB GPIO Expander", + "documentation": "https://www.home-assistant.io/integrations/numato", + "requirements": ["numato-gpio==0.7.1"], + "codeowners": ["@clssn"], + "quality_scale": "internal" +} diff --git a/homeassistant/components/numato/sensor.py b/homeassistant/components/numato/sensor.py new file mode 100644 index 00000000000..e268d32a293 --- /dev/null +++ b/homeassistant/components/numato/sensor.py @@ -0,0 +1,123 @@ +"""Sensor platform integration for ADC ports of Numato USB GPIO expanders.""" +import logging + +from numato_gpio import NumatoGpioError + +from homeassistant.const import CONF_ID, CONF_NAME, CONF_SENSORS +from homeassistant.helpers.entity import Entity + +from . import ( + CONF_DEVICES, + CONF_DST_RANGE, + CONF_DST_UNIT, + CONF_PORTS, + CONF_SRC_RANGE, + DATA_API, + DOMAIN, +) + +_LOGGER = logging.getLogger(__name__) + +ICON = "mdi:gauge" + + +def setup_platform(hass, config, add_entities, discovery_info=None): + """Set up the configured Numato USB GPIO ADC sensor ports.""" + if discovery_info is None: + return + + api = hass.data[DOMAIN][DATA_API] + sensors = [] + devices = hass.data[DOMAIN][CONF_DEVICES] + for device in [d for d in devices if CONF_SENSORS in d]: + device_id = device[CONF_ID] + ports = device[CONF_SENSORS][CONF_PORTS] + for port, adc_def in ports.items(): + try: + api.setup_input(device_id, port) + except NumatoGpioError as err: + _LOGGER.error( + "Failed to initialize sensor '%s' on Numato device %s port %s: %s", + adc_def[CONF_NAME], + device_id, + port, + err, + ) + continue + sensors.append( + NumatoGpioAdc( + adc_def[CONF_NAME], + device_id, + port, + adc_def[CONF_SRC_RANGE], + adc_def[CONF_DST_RANGE], + adc_def[CONF_DST_UNIT], + api, + ) + ) + add_entities(sensors, True) + + +class NumatoGpioAdc(Entity): + """Represents an ADC port of a Numato USB GPIO expander.""" + + def __init__(self, name, device_id, port, src_range, dst_range, dst_unit, api): + """Initialize the sensor.""" + self._name = name + self._device_id = device_id + self._port = port + self._src_range = src_range + self._dst_range = dst_range + self._state = None + self._unit_of_measurement = dst_unit + self._api = api + + @property + def name(self): + """Return the name of the sensor.""" + return self._name + + @property + def state(self): + """Return the state of the sensor.""" + return self._state + + @property + def unit_of_measurement(self): + """Return the unit the value is expressed in.""" + return self._unit_of_measurement + + @property + def icon(self): + """Return the icon to use in the frontend, if any.""" + return ICON + + def update(self): + """Get the latest data and updates the state.""" + try: + adc_val = self._api.read_adc_input(self._device_id, self._port) + adc_val = self._clamp_to_source_range(adc_val) + self._state = self._linear_scale_to_dest_range(adc_val) + except NumatoGpioError as err: + self._state = None + _LOGGER.error( + "Failed to update Numato device %s ADC-port %s: %s", + self._device_id, + self._port, + err, + ) + + def _clamp_to_source_range(self, val): + # clamp to source range + val = max(val, self._src_range[0]) + val = min(val, self._src_range[1]) + return val + + def _linear_scale_to_dest_range(self, val): + # linear scale to dest range + src_len = self._src_range[1] - self._src_range[0] + adc_val_rel = val - self._src_range[0] + ratio = float(adc_val_rel) / float(src_len) + dst_len = self._dst_range[1] - self._dst_range[0] + dest_val = self._dst_range[0] + ratio * dst_len + return dest_val diff --git a/homeassistant/components/numato/switch.py b/homeassistant/components/numato/switch.py new file mode 100644 index 00000000000..2f1be0cf311 --- /dev/null +++ b/homeassistant/components/numato/switch.py @@ -0,0 +1,108 @@ +"""Switch platform integration for Numato USB GPIO expanders.""" +import logging + +from numato_gpio import NumatoGpioError + +from homeassistant.const import ( + CONF_DEVICES, + CONF_ID, + CONF_SWITCHES, + DEVICE_DEFAULT_NAME, +) +from homeassistant.helpers.entity import ToggleEntity + +from . import CONF_INVERT_LOGIC, CONF_PORTS, DATA_API, DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +def setup_platform(hass, config, add_entities, discovery_info=None): + """Set up the configured Numato USB GPIO switch ports.""" + if discovery_info is None: + return + + api = hass.data[DOMAIN][DATA_API] + switches = [] + devices = hass.data[DOMAIN][CONF_DEVICES] + for device in [d for d in devices if CONF_SWITCHES in d]: + device_id = device[CONF_ID] + platform = device[CONF_SWITCHES] + invert_logic = platform[CONF_INVERT_LOGIC] + ports = platform[CONF_PORTS] + for port, port_name in ports.items(): + try: + api.setup_output(device_id, port) + api.write_output(device_id, port, 1 if invert_logic else 0) + except NumatoGpioError as err: + _LOGGER.error( + "Failed to initialize switch '%s' on Numato device %s port %s: %s", + port_name, + device_id, + port, + err, + ) + continue + switches.append( + NumatoGpioSwitch(port_name, device_id, port, invert_logic, api,) + ) + add_entities(switches, True) + + +class NumatoGpioSwitch(ToggleEntity): + """Representation of a Numato USB GPIO switch port.""" + + def __init__(self, name, device_id, port, invert_logic, api): + """Initialize the port.""" + self._name = name or DEVICE_DEFAULT_NAME + self._device_id = device_id + self._port = port + self._invert_logic = invert_logic + self._state = False + self._api = api + + @property + def name(self): + """Return the name of the switch.""" + return self._name + + @property + def should_poll(self): + """No polling needed.""" + return False + + @property + def is_on(self): + """Return true if port is turned on.""" + return self._state + + def turn_on(self, **kwargs): + """Turn the port on.""" + try: + self._api.write_output( + self._device_id, self._port, 0 if self._invert_logic else 1 + ) + self._state = True + self.schedule_update_ha_state() + except NumatoGpioError as err: + _LOGGER.error( + "Failed to turn on Numato device %s port %s: %s", + self._device_id, + self._port, + err, + ) + + def turn_off(self, **kwargs): + """Turn the port off.""" + try: + self._api.write_output( + self._device_id, self._port, 1 if self._invert_logic else 0 + ) + self._state = False + self.schedule_update_ha_state() + except NumatoGpioError as err: + _LOGGER.error( + "Failed to turn off Numato device %s port %s: %s", + self._device_id, + self._port, + err, + ) diff --git a/requirements_all.txt b/requirements_all.txt index 3a039d18832..74feb6866fc 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -951,6 +951,9 @@ nsw-fuel-api-client==1.0.10 # homeassistant.components.nuheat nuheat==0.3.0 +# homeassistant.components.numato +numato-gpio==0.7.1 + # homeassistant.components.iqvia # homeassistant.components.opencv # homeassistant.components.tensorflow diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 47362e55aee..07773687d88 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -377,6 +377,9 @@ nsw-fuel-api-client==1.0.10 # homeassistant.components.nuheat nuheat==0.3.0 +# homeassistant.components.numato +numato-gpio==0.7.1 + # homeassistant.components.iqvia # homeassistant.components.opencv # homeassistant.components.tensorflow diff --git a/tests/components/numato/__init__.py b/tests/components/numato/__init__.py new file mode 100644 index 00000000000..bcef00f9594 --- /dev/null +++ b/tests/components/numato/__init__.py @@ -0,0 +1 @@ +"""Tests for the numato integration.""" diff --git a/tests/components/numato/common.py b/tests/components/numato/common.py new file mode 100644 index 00000000000..18ece6690bc --- /dev/null +++ b/tests/components/numato/common.py @@ -0,0 +1,49 @@ +"""Definitions shared by all numato tests.""" + +from numato_gpio import NumatoGpioError + +NUMATO_CFG = { + "numato": { + "discover": ["/ttyACM0", "/ttyACM1"], + "devices": [ + { + "id": 0, + "binary_sensors": { + "invert_logic": False, + "ports": { + "2": "numato_binary_sensor_mock_port2", + "3": "numato_binary_sensor_mock_port3", + "4": "numato_binary_sensor_mock_port4", + }, + }, + "sensors": { + "ports": { + "1": { + "name": "numato_adc_mock_port1", + "source_range": [100, 1023], + "destination_range": [0, 10], + "unit": "mocks", + } + }, + }, + "switches": { + "invert_logic": False, + "ports": { + "5": "numato_switch_mock_port5", + "6": "numato_switch_mock_port6", + }, + }, + } + ], + } +} + + +def mockup_raise(*args, **kwargs): + """Mockup to replace regular functions for error injection.""" + raise NumatoGpioError("Error mockup") + + +def mockup_return(*args, **kwargs): + """Mockup to replace regular functions for error injection.""" + return False diff --git a/tests/components/numato/conftest.py b/tests/components/numato/conftest.py new file mode 100644 index 00000000000..c6fd13a099e --- /dev/null +++ b/tests/components/numato/conftest.py @@ -0,0 +1,28 @@ +"""Fixtures for numato tests.""" + +from copy import deepcopy + +import pytest + +from homeassistant.components import numato + +from . import numato_mock +from .common import NUMATO_CFG + + +@pytest.fixture +def config(): + """Provide a copy of the numato domain's test configuration. + + This helps to quickly change certain aspects of the configuration scoped + to each individual test. + """ + return deepcopy(NUMATO_CFG) + + +@pytest.fixture +def numato_fixture(monkeypatch): + """Inject the numato mockup into numato homeassistant module.""" + module_mock = numato_mock.NumatoModuleMock() + monkeypatch.setattr(numato, "gpio", module_mock) + return module_mock diff --git a/tests/components/numato/numato_mock.py b/tests/components/numato/numato_mock.py new file mode 100644 index 00000000000..1f8b24027de --- /dev/null +++ b/tests/components/numato/numato_mock.py @@ -0,0 +1,68 @@ +"""Mockup for the numato component interface.""" +from numato_gpio import NumatoGpioError + + +class NumatoModuleMock: + """Mockup for the numato_gpio module.""" + + NumatoGpioError = NumatoGpioError + + def __init__(self): + """Initialize the numato_gpio module mockup class.""" + self.devices = {} + + class NumatoDeviceMock: + """Mockup for the numato_gpio.NumatoUsbGpio class.""" + + def __init__(self, device): + """Initialize numato device mockup.""" + self.device = device + self.callbacks = {} + self.ports = set() + self.values = {} + + def setup(self, port, direction): + """Mockup for setup.""" + self.ports.add(port) + self.values[port] = None + + def write(self, port, value): + """Mockup for write.""" + self.values[port] = value + + def read(self, port): + """Mockup for read.""" + return 1 + + def adc_read(self, port): + """Mockup for adc_read.""" + return 1023 + + def add_event_detect(self, port, callback, direction): + """Mockup for add_event_detect.""" + self.callbacks[port] = callback + + def notify(self, enable): + """Mockup for notify.""" + + def mockup_inject_notification(self, port, value): + """Make the mockup execute a notification callback.""" + self.callbacks[port](port, value) + + OUT = 0 + IN = 1 + + RISING = 1 + FALLING = 2 + BOTH = 3 + + def discover(self, _=None): + """Mockup for the numato device discovery. + + Ignore the device list argument, mock discovers /dev/ttyACM0. + """ + self.devices[0] = NumatoModuleMock.NumatoDeviceMock("/dev/ttyACM0") + + def cleanup(self): + """Mockup for the numato device cleanup.""" + self.devices.clear() diff --git a/tests/components/numato/test_binary_sensor.py b/tests/components/numato/test_binary_sensor.py new file mode 100644 index 00000000000..5aa6aea2b8d --- /dev/null +++ b/tests/components/numato/test_binary_sensor.py @@ -0,0 +1,62 @@ +"""Tests for the numato binary_sensor platform.""" +from homeassistant.helpers import discovery +from homeassistant.setup import async_setup_component + +from .common import NUMATO_CFG, mockup_raise + +MOCKUP_ENTITY_IDS = { + "binary_sensor.numato_binary_sensor_mock_port2", + "binary_sensor.numato_binary_sensor_mock_port3", + "binary_sensor.numato_binary_sensor_mock_port4", +} + + +async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch): + """When port setup fails, no entity shall be created.""" + monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + + +async def test_setup_callbacks(hass, numato_fixture, monkeypatch): + """During setup a callback shall be registered.""" + + numato_fixture.discover() + + def mock_add_event_detect(self, port, callback, direction): + assert self == numato_fixture.devices[0] + assert port == 1 + assert callback is callable + assert direction == numato_fixture.BOTH + + monkeypatch.setattr( + numato_fixture.devices[0], "add_event_detect", mock_add_event_detect + ) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + + +async def test_hass_binary_sensor_notification(hass, numato_fixture): + """Test regular operations from within Home Assistant.""" + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() # wait until services are registered + assert ( + hass.states.get("binary_sensor.numato_binary_sensor_mock_port2").state == "on" + ) + await hass.async_add_executor_job(numato_fixture.devices[0].callbacks[2], 2, False) + await hass.async_block_till_done() + assert ( + hass.states.get("binary_sensor.numato_binary_sensor_mock_port2").state == "off" + ) + + +async def test_binary_sensor_setup_without_discovery_info(hass, config, numato_fixture): + """Test handling of empty discovery_info.""" + numato_fixture.discover() + await discovery.async_load_platform(hass, "binary_sensor", "numato", None, config) + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + await hass.async_block_till_done() # wait for numato platform to be loaded + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id in hass.states.async_entity_ids() diff --git a/tests/components/numato/test_init.py b/tests/components/numato/test_init.py new file mode 100644 index 00000000000..dd5643be12a --- /dev/null +++ b/tests/components/numato/test_init.py @@ -0,0 +1,161 @@ +"""Tests for the numato integration.""" +from numato_gpio import NumatoGpioError +import pytest + +from homeassistant.components import numato +from homeassistant.setup import async_setup_component + +from .common import NUMATO_CFG, mockup_raise, mockup_return + + +async def test_setup_no_devices(hass, numato_fixture, monkeypatch): + """Test handling of an 'empty' discovery. + + Platform setups are expected to return after handling errors locally + without raising. + """ + monkeypatch.setattr(numato_fixture, "discover", mockup_return) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + assert len(numato_fixture.devices) == 0 + + +async def test_fail_setup_raising_discovery(hass, numato_fixture, caplog, monkeypatch): + """Test handling of an exception during discovery. + + Setup shall return False. + """ + monkeypatch.setattr(numato_fixture, "discover", mockup_raise) + assert not await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() + + +async def test_hass_numato_api_wrong_port_directions(hass, numato_fixture): + """Test handling of wrong port directions. + + This won't happen in the current platform implementation but would raise + in case of an introduced bug in the platforms. + """ + numato_fixture.discover() + api = numato.NumatoAPI() + api.setup_output(0, 5) + api.setup_input(0, 2) + api.setup_input(0, 6) + with pytest.raises(NumatoGpioError): + api.read_adc_input(0, 5) # adc_read from output + api.read_input(0, 6) # read from output + api.write_output(0, 2, 1) # write to input + + +async def test_hass_numato_api_errors(hass, numato_fixture, monkeypatch): + """Test whether Home Assistant numato API (re-)raises errors.""" + numato_fixture.discover() + monkeypatch.setattr(numato_fixture.devices[0], "setup", mockup_raise) + monkeypatch.setattr(numato_fixture.devices[0], "adc_read", mockup_raise) + monkeypatch.setattr(numato_fixture.devices[0], "read", mockup_raise) + monkeypatch.setattr(numato_fixture.devices[0], "write", mockup_raise) + api = numato.NumatoAPI() + with pytest.raises(NumatoGpioError): + api.setup_input(0, 5) + api.read_adc_input(0, 1) + api.read_input(0, 2) + api.write_output(0, 2, 1) + + +async def test_invalid_port_number(hass, numato_fixture, config): + """Test validation of ADC port number type.""" + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + port1_config = sensorports_cfg["1"] + sensorports_cfg["one"] = port1_config + del sensorports_cfg["1"] + assert not await async_setup_component(hass, "numato", config) + await hass.async_block_till_done() + assert not numato_fixture.devices + + +async def test_too_low_adc_port_number(hass, numato_fixture, config): + """Test handling of failing component setup. + + Tries setting up an ADC on a port below (0) the allowed range. + """ + + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg.update({0: {"name": "toolow"}}) + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_too_high_adc_port_number(hass, numato_fixture, config): + """Test handling of failing component setup. + + Tries setting up an ADC on a port above (8) the allowed range. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg.update({8: {"name": "toohigh"}}) + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_range_value_type(hass, numato_fixture, config): + """Test validation of ADC range config's types. + + Replaces the source range beginning by a string. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["source_range"][0] = "zero" + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_source_range_length(hass, numato_fixture, config): + """Test validation of ADC range config's length. + + Adds an element to the source range. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["source_range"].append(42) + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_source_range_order(hass, numato_fixture, config): + """Test validation of ADC range config's order. + + Sets the source range to a decreasing [2, 1]. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["source_range"] = [2, 1] + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_destination_range_value_type(hass, numato_fixture, config): + """Test validation of ADC range . + + Replaces the destination range beginning by a string. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["destination_range"][0] = "zero" + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_destination_range_length(hass, numato_fixture, config): + """Test validation of ADC range config's length. + + Adds an element to the destination range. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["destination_range"].append(42) + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices + + +async def test_invalid_adc_destination_range_order(hass, numato_fixture, config): + """Test validation of ADC range config's order. + + Sets the destination range to a decreasing [2, 1]. + """ + sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"] + sensorports_cfg["1"]["destination_range"] = [2, 1] + assert not await async_setup_component(hass, "numato", config) + assert not numato_fixture.devices diff --git a/tests/components/numato/test_sensor.py b/tests/components/numato/test_sensor.py new file mode 100644 index 00000000000..c6d176dbc90 --- /dev/null +++ b/tests/components/numato/test_sensor.py @@ -0,0 +1,38 @@ +"""Tests for the numato sensor platform.""" +from homeassistant.const import STATE_UNKNOWN +from homeassistant.helpers import discovery +from homeassistant.setup import async_setup_component + +from .common import NUMATO_CFG, mockup_raise + +MOCKUP_ENTITY_IDS = { + "sensor.numato_adc_mock_port1", +} + + +async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch): + """When port setup fails, no entity shall be created.""" + monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + + +async def test_failing_sensor_update(hass, numato_fixture, monkeypatch): + """Test condition when a sensor update fails.""" + monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "adc_read", mockup_raise) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() + assert hass.states.get("sensor.numato_adc_mock_port1").state is STATE_UNKNOWN + + +async def test_sensor_setup_without_discovery_info(hass, config, numato_fixture): + """Test handling of empty discovery_info.""" + numato_fixture.discover() + await discovery.async_load_platform(hass, "sensor", "numato", None, config) + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + await hass.async_block_till_done() # wait for numato platform to be loaded + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id in hass.states.async_entity_ids() diff --git a/tests/components/numato/test_switch.py b/tests/components/numato/test_switch.py new file mode 100644 index 00000000000..91cda5c2a37 --- /dev/null +++ b/tests/components/numato/test_switch.py @@ -0,0 +1,114 @@ +"""Tests for the numato switch platform.""" +from homeassistant.components import switch +from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_TURN_ON +from homeassistant.helpers import discovery +from homeassistant.setup import async_setup_component + +from .common import NUMATO_CFG, mockup_raise + +MOCKUP_ENTITY_IDS = { + "switch.numato_switch_mock_port5", + "switch.numato_switch_mock_port6", +} + + +async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch): + """When port setup fails, no entity shall be created.""" + monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise) + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + + +async def test_regular_hass_operations(hass, numato_fixture): + """Test regular operations from within Home Assistant.""" + assert await async_setup_component(hass, "numato", NUMATO_CFG) + await hass.async_block_till_done() # wait until services are registered + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_ON, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port5").state == "on" + assert numato_fixture.devices[0].values[5] == 1 + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_ON, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port6").state == "on" + assert numato_fixture.devices[0].values[6] == 1 + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_OFF, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port5").state == "off" + assert numato_fixture.devices[0].values[5] == 0 + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_OFF, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port6").state == "off" + assert numato_fixture.devices[0].values[6] == 0 + + +async def test_failing_hass_operations(hass, numato_fixture, monkeypatch): + """Test failing operations called from within Home Assistant. + + Switches remain in their initial 'off' state when the device can't + be written to. + """ + assert await async_setup_component(hass, "numato", NUMATO_CFG) + + await hass.async_block_till_done() # wait until services are registered + monkeypatch.setattr(numato_fixture.devices[0], "write", mockup_raise) + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_ON, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port5").state == "off" + assert not numato_fixture.devices[0].values[5] + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_ON, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port6").state == "off" + assert not numato_fixture.devices[0].values[6] + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_OFF, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port5").state == "off" + assert not numato_fixture.devices[0].values[5] + await hass.services.async_call( + switch.DOMAIN, + SERVICE_TURN_OFF, + {ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"}, + blocking=True, + ) + assert hass.states.get("switch.numato_switch_mock_port6").state == "off" + assert not numato_fixture.devices[0].values[6] + + +async def test_switch_setup_without_discovery_info(hass, config, numato_fixture): + """Test handling of empty discovery_info.""" + numato_fixture.discover() + await discovery.async_load_platform(hass, "switch", "numato", None, config) + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id not in hass.states.async_entity_ids() + await hass.async_block_till_done() # wait for numato platform to be loaded + for entity_id in MOCKUP_ENTITY_IDS: + assert entity_id in hass.states.async_entity_ids()