diff --git a/.coveragerc b/.coveragerc index ef8ea722106..0da952504de 100644 --- a/.coveragerc +++ b/.coveragerc @@ -372,7 +372,6 @@ omit = homeassistant/components/ihc/* homeassistant/components/imap/sensor.py homeassistant/components/imap_email_content/sensor.py - homeassistant/components/influxdb/sensor.py homeassistant/components/insteon/* homeassistant/components/incomfort/* homeassistant/components/intesishome/* diff --git a/homeassistant/components/influxdb/__init__.py b/homeassistant/components/influxdb/__init__.py index 94a68c25504..c6e7cd19da0 100644 --- a/homeassistant/components/influxdb/__init__.py +++ b/homeassistant/components/influxdb/__init__.py @@ -2,7 +2,6 @@ import logging import math import queue -import re import threading import time from typing import Dict @@ -16,16 +15,7 @@ import urllib3.exceptions import voluptuous as vol from homeassistant.const import ( - CONF_API_VERSION, - CONF_HOST, - CONF_PASSWORD, - CONF_PATH, - CONF_PORT, - CONF_SSL, - CONF_TOKEN, CONF_URL, - CONF_USERNAME, - CONF_VERIFY_SSL, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, STATE_UNAVAILABLE, @@ -39,39 +29,49 @@ from homeassistant.helpers.entityfilter import ( convert_include_exclude_filter, ) +from .const import ( + API_VERSION_2, + BATCH_BUFFER_SIZE, + BATCH_TIMEOUT, + CLIENT_ERROR_V1_WITH_RETRY, + CLIENT_ERROR_V2_WITH_RETRY, + COMPONENT_CONFIG_SCHEMA_CONNECTION, + CONF_API_VERSION, + CONF_BUCKET, + CONF_COMPONENT_CONFIG, + CONF_COMPONENT_CONFIG_DOMAIN, + CONF_COMPONENT_CONFIG_GLOB, + CONF_DB_NAME, + CONF_DEFAULT_MEASUREMENT, + CONF_HOST, + CONF_ORG, + CONF_OVERRIDE_MEASUREMENT, + CONF_PASSWORD, + CONF_PATH, + CONF_PORT, + CONF_RETRY_COUNT, + CONF_SSL, + CONF_TAGS, + CONF_TAGS_ATTRIBUTES, + CONF_TOKEN, + CONF_USERNAME, + CONF_VERIFY_SSL, + CONNECTION_ERROR_WITH_RETRY, + DEFAULT_API_VERSION, + DEFAULT_HOST_V2, + DEFAULT_SSL_V2, + DOMAIN, + QUEUE_BACKLOG_SECONDS, + RE_DECIMAL, + RE_DIGIT_TAIL, + RETRY_DELAY, + RETRY_INTERVAL, + TIMEOUT, + WRITE_ERROR, +) + _LOGGER = logging.getLogger(__name__) -CONF_DB_NAME = "database" -CONF_BUCKET = "bucket" -CONF_ORG = "organization" -CONF_TAGS = "tags" -CONF_DEFAULT_MEASUREMENT = "default_measurement" -CONF_OVERRIDE_MEASUREMENT = "override_measurement" -CONF_TAGS_ATTRIBUTES = "tags_attributes" -CONF_COMPONENT_CONFIG = "component_config" -CONF_COMPONENT_CONFIG_GLOB = "component_config_glob" -CONF_COMPONENT_CONFIG_DOMAIN = "component_config_domain" -CONF_RETRY_COUNT = "max_retries" - -DEFAULT_DATABASE = "home_assistant" -DEFAULT_HOST_V2 = "us-west-2-1.aws.cloud2.influxdata.com" -DEFAULT_SSL_V2 = True -DEFAULT_BUCKET = "Home Assistant" -DEFAULT_VERIFY_SSL = True -DEFAULT_API_VERSION = "1" - -DOMAIN = "influxdb" -API_VERSION_2 = "2" -TIMEOUT = 5 -RETRY_DELAY = 20 -QUEUE_BACKLOG_SECONDS = 30 -RETRY_INTERVAL = 60 # seconds - -BATCH_TIMEOUT = 1 -BATCH_BUFFER_SIZE = 100 - -DB_CONNECTION_FAILURE_MSG = () - def create_influx_url(conf: Dict) -> Dict: """Build URL used from config inputs and default when necessary.""" @@ -120,26 +120,6 @@ def validate_version_specific_config(conf: Dict) -> Dict: return conf -COMPONENT_CONFIG_SCHEMA_CONNECTION = { - # Connection config for V1 and V2 APIs. - vol.Optional(CONF_API_VERSION, default=DEFAULT_API_VERSION): vol.All( - vol.Coerce(str), vol.In([DEFAULT_API_VERSION, API_VERSION_2]), - ), - vol.Optional(CONF_HOST): cv.string, - vol.Optional(CONF_PATH): cv.string, - vol.Optional(CONF_PORT): cv.port, - vol.Optional(CONF_SSL): cv.boolean, - # Connection config for V1 API only. - vol.Inclusive(CONF_USERNAME, "authentication"): cv.string, - vol.Inclusive(CONF_PASSWORD, "authentication"): cv.string, - vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string, - vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean, - # Connection config for V2 API only. - vol.Inclusive(CONF_TOKEN, "v2_authentication"): cv.string, - vol.Inclusive(CONF_ORG, "v2_authentication"): cv.string, - vol.Optional(CONF_BUCKET, default=DEFAULT_BUCKET): cv.string, -} - _CONFIG_SCHEMA_ENTRY = vol.Schema({vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string}) _CONFIG_SCHEMA = INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend( @@ -174,9 +154,6 @@ CONFIG_SCHEMA = vol.Schema( extra=vol.ALLOW_EXTRA, ) -RE_DIGIT_TAIL = re.compile(r"^[^\.]*\d+\.?\d+[^\.]*$") -RE_DECIMAL = re.compile(r"[^\d.]+") - def get_influx_connection(client_kwargs, bucket): """Create and check the correct influx connection for the API version.""" @@ -254,26 +231,20 @@ def setup(hass, config): influx = get_influx_connection(kwargs, bucket) if use_v2_api: write_api = influx.write_api(write_options=ASYNCHRONOUS) - except (exceptions.InfluxDBClientError, requests.exceptions.ConnectionError) as exc: - _LOGGER.error( - "Database host is not accessible due to '%s', please " - "check your entries in the configuration file (host, " - "port, etc.) and verify that the database exists and is " - "READ/WRITE. Retrying again in %s seconds.", - exc, - RETRY_INTERVAL, - ) + except ( + OSError, + requests.exceptions.ConnectionError, + urllib3.exceptions.HTTPError, + ) as exc: + _LOGGER.error(CONNECTION_ERROR_WITH_RETRY, exc) event_helper.call_later(hass, RETRY_INTERVAL, lambda _: setup(hass, config)) return True - except (ApiException, urllib3.exceptions.HTTPError) as exc: - _LOGGER.error( - "Bucket is not accessible due to '%s', please " - "check your entries in the configuration file (url, org, " - "bucket, etc.) and verify that the org and bucket exist and the " - "provided token has WRITE access. Retrying again in %s seconds.", - exc, - RETRY_INTERVAL, - ) + except exceptions.InfluxDBClientError as exc: + _LOGGER.error(CLIENT_ERROR_V1_WITH_RETRY, exc) + event_helper.call_later(hass, RETRY_INTERVAL, lambda _: setup(hass, config)) + return True + except ApiException as exc: + _LOGGER.error(CLIENT_ERROR_V2_WITH_RETRY, exc) event_helper.call_later(hass, RETRY_INTERVAL, lambda _: setup(hass, config)) return True @@ -468,7 +439,7 @@ class InfluxThread(threading.Thread): time.sleep(RETRY_DELAY) else: if not self.write_errors: - _LOGGER.error("Write error: %s", err) + _LOGGER.error(WRITE_ERROR, json, err) self.write_errors += len(json) def run(self): diff --git a/homeassistant/components/influxdb/const.py b/homeassistant/components/influxdb/const.py new file mode 100644 index 00000000000..b59ead3a849 --- /dev/null +++ b/homeassistant/components/influxdb/const.py @@ -0,0 +1,118 @@ +"""Constants for InfluxDB integration.""" +from datetime import timedelta +import re + +import voluptuous as vol + +from homeassistant.const import ( + CONF_API_VERSION, + CONF_HOST, + CONF_PASSWORD, + CONF_PATH, + CONF_PORT, + CONF_SSL, + CONF_TOKEN, + CONF_USERNAME, + CONF_VERIFY_SSL, +) +import homeassistant.helpers.config_validation as cv + +CONF_DB_NAME = "database" +CONF_BUCKET = "bucket" +CONF_ORG = "organization" +CONF_TAGS = "tags" +CONF_DEFAULT_MEASUREMENT = "default_measurement" +CONF_OVERRIDE_MEASUREMENT = "override_measurement" +CONF_TAGS_ATTRIBUTES = "tags_attributes" +CONF_COMPONENT_CONFIG = "component_config" +CONF_COMPONENT_CONFIG_GLOB = "component_config_glob" +CONF_COMPONENT_CONFIG_DOMAIN = "component_config_domain" +CONF_RETRY_COUNT = "max_retries" + +CONF_LANGUAGE = "language" +CONF_QUERIES = "queries" +CONF_QUERIES_FLUX = "queries_flux" +CONF_GROUP_FUNCTION = "group_function" +CONF_FIELD = "field" +CONF_MEASUREMENT_NAME = "measurement" +CONF_WHERE = "where" + +CONF_RANGE_START = "range_start" +CONF_RANGE_STOP = "range_stop" +CONF_FUNCTION = "function" +CONF_QUERY = "query" +CONF_IMPORTS = "imports" + +DEFAULT_DATABASE = "home_assistant" +DEFAULT_HOST_V2 = "us-west-2-1.aws.cloud2.influxdata.com" +DEFAULT_SSL_V2 = True +DEFAULT_BUCKET = "Home Assistant" +DEFAULT_VERIFY_SSL = True +DEFAULT_API_VERSION = "1" +DEFAULT_GROUP_FUNCTION = "mean" +DEFAULT_FIELD = "value" +DEFAULT_RANGE_START = "-15m" +DEFAULT_RANGE_STOP = "now()" + +DOMAIN = "influxdb" +API_VERSION_2 = "2" +TIMEOUT = 5 +RETRY_DELAY = 20 +QUEUE_BACKLOG_SECONDS = 30 +RETRY_INTERVAL = 60 # seconds +BATCH_TIMEOUT = 1 +BATCH_BUFFER_SIZE = 100 +LANGUAGE_INFLUXQL = "influxQL" +LANGUAGE_FLUX = "flux" +TEST_QUERY_V1 = "SHOW SERIES LIMIT 1;" +TEST_QUERY_V2 = "buckets() |> limit(n:1)" + +MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60) + +RE_DIGIT_TAIL = re.compile(r"^[^\.]*\d+\.?\d+[^\.]*$") +RE_DECIMAL = re.compile(r"[^\d.]+") + +CONNECTION_ERROR = ( + "Cannot connect to InfluxDB due to '%s'. " + "Please check that the provided connection details (host, port, etc.) are correct " + "and that your InfluxDB server is running and accessible." +) +CLIENT_ERROR_V2 = ( + "InfluxDB bucket is not accessible due to '%s'. " + "Please check that the bucket, org and token are correct and " + "that the token has the correct permissions set." +) +CLIENT_ERROR_V1 = ( + "InfluxDB database is not accessible due to '%s'. " + "Please check that the database, username and password are correct and " + "that the specified user has the correct permissions set." +) +WRITE_ERROR = "Could not write '%s' to influx due to '%s'." +QUERY_ERROR = ( + "Could not execute query '%s' due to '%s'. Check the syntax of your query." +) +RETRY_MESSAGE = f"Retrying again in {RETRY_INTERVAL} seconds." +CONNECTION_ERROR_WITH_RETRY = f"{CONNECTION_ERROR} {RETRY_MESSAGE}" +CLIENT_ERROR_V1_WITH_RETRY = f"{CLIENT_ERROR_V1} {RETRY_MESSAGE}" +CLIENT_ERROR_V2_WITH_RETRY = f"{CLIENT_ERROR_V2} {RETRY_MESSAGE}" + + +COMPONENT_CONFIG_SCHEMA_CONNECTION = { + # Connection config for V1 and V2 APIs. + vol.Optional(CONF_API_VERSION, default=DEFAULT_API_VERSION): vol.All( + vol.Coerce(str), vol.In([DEFAULT_API_VERSION, API_VERSION_2]), + ), + vol.Optional(CONF_HOST): cv.string, + vol.Optional(CONF_PATH): cv.string, + vol.Optional(CONF_PORT): cv.port, + vol.Optional(CONF_SSL): cv.boolean, + # Connection config for V1 API only. + vol.Inclusive(CONF_USERNAME, "authentication"): cv.string, + vol.Inclusive(CONF_PASSWORD, "authentication"): cv.string, + vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string, + vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean, + # Connection config for V2 API only. + vol.Inclusive(CONF_TOKEN, "v2_authentication"): cv.string, + vol.Inclusive(CONF_ORG, "v2_authentication"): cv.string, + vol.Optional(CONF_BUCKET, default=DEFAULT_BUCKET): cv.string, +} diff --git a/homeassistant/components/influxdb/sensor.py b/homeassistant/components/influxdb/sensor.py index 0cf25c0b2f4..302bcde2373 100644 --- a/homeassistant/components/influxdb/sensor.py +++ b/homeassistant/components/influxdb/sensor.py @@ -1,5 +1,4 @@ """InfluxDB component which allows you to get data from an Influx database.""" -from datetime import timedelta import logging from typing import Dict @@ -30,40 +29,35 @@ import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity import Entity from homeassistant.util import Throttle -from . import ( +from . import create_influx_url, validate_version_specific_config +from .const import ( API_VERSION_2, COMPONENT_CONFIG_SCHEMA_CONNECTION, CONF_BUCKET, CONF_DB_NAME, + CONF_FIELD, + CONF_GROUP_FUNCTION, + CONF_IMPORTS, + CONF_MEASUREMENT_NAME, CONF_ORG, + CONF_QUERIES, + CONF_QUERIES_FLUX, + CONF_QUERY, + CONF_RANGE_START, + CONF_RANGE_STOP, + CONF_WHERE, DEFAULT_API_VERSION, - create_influx_url, - validate_version_specific_config, + DEFAULT_FIELD, + DEFAULT_GROUP_FUNCTION, + DEFAULT_RANGE_START, + DEFAULT_RANGE_STOP, + MIN_TIME_BETWEEN_UPDATES, + TEST_QUERY_V1, + TEST_QUERY_V2, ) _LOGGER = logging.getLogger(__name__) -DEFAULT_GROUP_FUNCTION = "mean" -DEFAULT_FIELD = "value" - -CONF_QUERIES = "queries" -CONF_QUERIES_FLUX = "queries_flux" -CONF_GROUP_FUNCTION = "group_function" -CONF_FIELD = "field" -CONF_MEASUREMENT_NAME = "measurement" -CONF_WHERE = "where" - -CONF_RANGE_START = "range_start" -CONF_RANGE_STOP = "range_stop" -CONF_FUNCTION = "function" -CONF_QUERY = "query" -CONF_IMPORTS = "imports" - -DEFAULT_RANGE_START = "-15m" -DEFAULT_RANGE_STOP = "now()" - -MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60) - _QUERY_SENSOR_SCHEMA = vol.Schema( { vol.Required(CONF_NAME): cv.string, @@ -217,9 +211,7 @@ class InfluxSensor(Entity): try: if query_api is not None: - query_api.query( - f'from(bucket: "{bucket}") |> range(start: -1ms) |> keep(columns: ["_time"]) |> limit(n: 1)' - ) + query_api.query(TEST_QUERY_V2) self.connected = True self.data = InfluxSensorDataV2( query_api, @@ -232,7 +224,7 @@ class InfluxSensor(Entity): ) else: - influx.query("SHOW SERIES LIMIT 1;") + influx.query(TEST_QUERY_V1) self.connected = True self.data = InfluxSensorDataV1( influx, @@ -336,7 +328,7 @@ class InfluxSensorDataV2: try: tables = self.query_api.query(self.full_query) - except ApiException as exc: + except (OSError, ApiException) as exc: _LOGGER.error( "Could not execute query '%s' due to '%s', " "Check the syntax of your query", @@ -390,7 +382,7 @@ class InfluxSensorDataV1: try: points = list(self.influx.query(self.query).get_points()) - except exceptions.InfluxDBClientError as exc: + except (OSError, exceptions.InfluxDBClientError) as exc: _LOGGER.error( "Could not execute query '%s' due to '%s', " "Check the syntax of your query", diff --git a/tests/components/influxdb/test_init.py b/tests/components/influxdb/test_init.py index 29247bec9c8..04486f8f9b3 100644 --- a/tests/components/influxdb/test_init.py +++ b/tests/components/influxdb/test_init.py @@ -5,6 +5,7 @@ import datetime import pytest import homeassistant.components.influxdb as influxdb +from homeassistant.components.influxdb.const import DEFAULT_BUCKET from homeassistant.const import ( EVENT_STATE_CHANGED, STATE_OFF, @@ -17,6 +18,8 @@ from homeassistant.setup import async_setup_component from tests.async_mock import MagicMock, Mock, call, patch +INFLUX_PATH = "homeassistant.components.influxdb" +INFLUX_CLIENT_PATH = f"{INFLUX_PATH}.InfluxDBClient" BASE_V1_CONFIG = {} BASE_V2_CONFIG = { "api_version": influxdb.API_VERSION_2, @@ -38,8 +41,7 @@ def mock_batch_timeout(hass, monkeypatch): """Mock the event bus listener and the batch timeout for tests.""" hass.bus.listen = MagicMock() monkeypatch.setattr( - "homeassistant.components.influxdb.InfluxThread.batch_timeout", - Mock(return_value=0), + f"{INFLUX_PATH}.InfluxThread.batch_timeout", Mock(return_value=0), ) @@ -47,9 +49,9 @@ def mock_batch_timeout(hass, monkeypatch): def mock_client_fixture(request): """Patch the InfluxDBClient object with mock for version under test.""" if request.param == influxdb.API_VERSION_2: - client_target = "homeassistant.components.influxdb.InfluxDBClientV2" + client_target = f"{INFLUX_CLIENT_PATH}V2" else: - client_target = "homeassistant.components.influxdb.InfluxDBClient" + client_target = INFLUX_CLIENT_PATH with patch(client_target) as client: yield client @@ -59,7 +61,7 @@ def mock_client_fixture(request): def get_mock_call_fixture(request): """Get version specific lambda to make write API call mock.""" if request.param == influxdb.API_VERSION_2: - return lambda body: call(bucket=influxdb.DEFAULT_BUCKET, record=body) + return lambda body: call(bucket=DEFAULT_BUCKET, record=body) # pylint: disable=unnecessary-lambda return lambda body: call(body) @@ -144,7 +146,16 @@ async def test_setup_minimal_config(hass, mock_client, config_ext, get_write_api "mock_client, config_ext, get_write_api", [ (influxdb.DEFAULT_API_VERSION, {"username": "user"}, _get_write_api_mock_v1), - (influxdb.DEFAULT_API_VERSION, {"token": "token"}, _get_write_api_mock_v1), + ( + influxdb.DEFAULT_API_VERSION, + {"token": "token", "organization": "organization"}, + _get_write_api_mock_v1, + ), + ( + influxdb.API_VERSION_2, + {"api_version": influxdb.API_VERSION_2}, + _get_write_api_mock_v2, + ), ( influxdb.API_VERSION_2, {"api_version": influxdb.API_VERSION_2, "organization": "organization"}, @@ -1147,3 +1158,165 @@ async def test_event_listener_backlog_full( hass.data[influxdb.DOMAIN].block_till_done() assert get_write_api(mock_client).call_count == 0 + + +@pytest.mark.parametrize( + "mock_client, config_ext, get_write_api, get_mock_call", + [ + ( + influxdb.DEFAULT_API_VERSION, + BASE_V1_CONFIG, + _get_write_api_mock_v1, + influxdb.DEFAULT_API_VERSION, + ), + ( + influxdb.API_VERSION_2, + BASE_V2_CONFIG, + _get_write_api_mock_v2, + influxdb.API_VERSION_2, + ), + ], + indirect=["mock_client", "get_mock_call"], +) +async def test_event_listener_attribute_name_conflict( + hass, mock_client, config_ext, get_write_api, get_mock_call +): + """Test the event listener when an attribute conflicts with another field.""" + handler_method = await _setup(hass, mock_client, config_ext, get_write_api) + + attrs = {"value": "value_str"} + state = MagicMock( + state=1, + domain="fake", + entity_id="fake.something", + object_id="something", + attributes=attrs, + ) + event = MagicMock(data={"new_state": state}, time_fired=12345) + body = [ + { + "measurement": "fake.something", + "tags": {"domain": "fake", "entity_id": "something"}, + "time": 12345, + "fields": {"value": 1, "value__str": "value_str"}, + } + ] + handler_method(event) + hass.data[influxdb.DOMAIN].block_till_done() + + write_api = get_write_api(mock_client) + assert write_api.call_count == 1 + assert write_api.call_args == get_mock_call(body) + + +@pytest.mark.parametrize( + "mock_client, config_ext, get_write_api, get_mock_call, test_exception", + [ + ( + influxdb.DEFAULT_API_VERSION, + BASE_V1_CONFIG, + _get_write_api_mock_v1, + influxdb.DEFAULT_API_VERSION, + ConnectionError("fail"), + ), + ( + influxdb.DEFAULT_API_VERSION, + BASE_V1_CONFIG, + _get_write_api_mock_v1, + influxdb.DEFAULT_API_VERSION, + influxdb.exceptions.InfluxDBClientError("fail"), + ), + ( + influxdb.API_VERSION_2, + BASE_V2_CONFIG, + _get_write_api_mock_v2, + influxdb.API_VERSION_2, + ConnectionError("fail"), + ), + ( + influxdb.API_VERSION_2, + BASE_V2_CONFIG, + _get_write_api_mock_v2, + influxdb.API_VERSION_2, + influxdb.ApiException(), + ), + ], + indirect=["mock_client", "get_mock_call"], +) +async def test_connection_failure_on_startup( + hass, caplog, mock_client, config_ext, get_write_api, get_mock_call, test_exception +): + """Test the event listener when it fails to connect to Influx on startup.""" + write_api = get_write_api(mock_client) + write_api.side_effect = test_exception + config = {"influxdb": config_ext} + + with patch(f"{INFLUX_PATH}.event_helper") as event_helper: + assert await async_setup_component(hass, influxdb.DOMAIN, config) + await hass.async_block_till_done() + + assert ( + len([record for record in caplog.records if record.levelname == "ERROR"]) + == 1 + ) + event_helper.call_later.assert_called_once() + hass.bus.listen.assert_not_called() + + +@pytest.mark.parametrize( + "mock_client, config_ext, get_write_api, get_mock_call, test_exception", + [ + ( + influxdb.DEFAULT_API_VERSION, + BASE_V1_CONFIG, + _get_write_api_mock_v1, + influxdb.DEFAULT_API_VERSION, + influxdb.exceptions.InfluxDBClientError("fail", code=400), + ), + ( + influxdb.API_VERSION_2, + BASE_V2_CONFIG, + _get_write_api_mock_v2, + influxdb.API_VERSION_2, + influxdb.ApiException(status=400), + ), + ], + indirect=["mock_client", "get_mock_call"], +) +async def test_invalid_inputs_error( + hass, caplog, mock_client, config_ext, get_write_api, get_mock_call, test_exception +): + """ + Test the event listener when influx returns invalid inputs on write. + + The difference in error handling in this case is that we do not sleep + and try again, if an input is invalid it is logged and dropped. + + Note that this shouldn't actually occur, if its possible for the current + code to send an invalid input then it should be adjusted to stop that. + But Influx is an external service so there may be edge cases that + haven't been encountered yet. + """ + handler_method = await _setup(hass, mock_client, config_ext, get_write_api) + + write_api = get_write_api(mock_client) + write_api.side_effect = test_exception + state = MagicMock( + state=1, + domain="fake", + entity_id="fake.something", + object_id="something", + attributes={}, + ) + event = MagicMock(data={"new_state": state}, time_fired=12345) + + with patch(f"{INFLUX_PATH}.time.sleep") as sleep: + handler_method(event) + hass.data[influxdb.DOMAIN].block_till_done() + + write_api.assert_called_once() + assert ( + len([record for record in caplog.records if record.levelname == "ERROR"]) + == 1 + ) + sleep.assert_not_called() diff --git a/tests/components/influxdb/test_sensor.py b/tests/components/influxdb/test_sensor.py new file mode 100644 index 00000000000..4ef42d0fa3a --- /dev/null +++ b/tests/components/influxdb/test_sensor.py @@ -0,0 +1,453 @@ +"""The tests for the InfluxDB sensor.""" +from dataclasses import dataclass +from typing import Dict, List, Type + +from influxdb.exceptions import InfluxDBClientError +from influxdb_client.rest import ApiException +import pytest +from voluptuous import Invalid + +from homeassistant.components.influxdb.const import ( + API_VERSION_2, + DEFAULT_API_VERSION, + DOMAIN, + TEST_QUERY_V1, + TEST_QUERY_V2, +) +from homeassistant.components.influxdb.sensor import PLATFORM_SCHEMA +import homeassistant.components.sensor as sensor +from homeassistant.const import STATE_UNKNOWN +from homeassistant.setup import async_setup_component + +from tests.async_mock import MagicMock, patch + +INFLUXDB_PATH = "homeassistant.components.influxdb" +INFLUXDB_CLIENT_PATH = f"{INFLUXDB_PATH}.sensor.InfluxDBClient" +INFLUXDB_SENSOR_PATH = f"{INFLUXDB_PATH}.sensor" + +BASE_V1_CONFIG = {} +BASE_V2_CONFIG = { + "api_version": API_VERSION_2, + "organization": "org", + "token": "token", +} + +BASE_V1_QUERY = { + "queries": [ + { + "name": "test", + "measurement": "measurement", + "where": "where", + "field": "field", + } + ], +} +BASE_V2_QUERY = {"queries_flux": [{"name": "test", "query": "query"}]} + + +@dataclass +class Record: + """Record in a Table.""" + + values: Dict + + +@dataclass +class Table: + """Table in an Influx 2 resultset.""" + + records: List[Type[Record]] + + +@pytest.fixture(name="mock_client") +def mock_client_fixture(request): + """Patch the InfluxDBClient object with mock for version under test.""" + if request.param == API_VERSION_2: + client_target = f"{INFLUXDB_CLIENT_PATH}V2" + else: + client_target = INFLUXDB_CLIENT_PATH + + with patch(client_target) as client: + yield client + + +@pytest.fixture(autouse=True) +def mock_influx_platform(): + """ + Mock the influx client and queue in the main platform. + + Successful sensor setup is really independent of the main platform. + But since its one integration there is an internal dependency. + Mocking the client library there prevents failures and mocking the queue + to return `None` on get makes the listener shutdown immediately after initialization. + """ + with patch(f"{INFLUXDB_PATH}.InfluxDBClient") as mock_v1_client, patch( + f"{INFLUXDB_PATH}.InfluxDBClientV2" + ) as mock_v2_client, patch( + f"{INFLUXDB_PATH}.queue.Queue.get", return_value=None + ) as queue_get: + yield (mock_v1_client, mock_v2_client, queue_get) + + +@pytest.fixture(autouse=True, scope="module") +def mock_client_close(): + """Mock close method of clients at module scope.""" + with patch(f"{INFLUXDB_CLIENT_PATH}.close") as close_v1, patch( + f"{INFLUXDB_CLIENT_PATH}V2.close" + ) as close_v2: + yield (close_v1, close_v2) + + +def _make_v1_resultset(*args): + """Create a mock V1 resultset.""" + for arg in args: + yield {"value": arg} + + +def _make_v2_resultset(*args): + """Create a mock V2 resultset.""" + tables = [] + + for arg in args: + values = {"_value": arg} + record = Record(values) + tables.append(Table([record])) + + return tables + + +def _set_query_mock_v1(mock_influx_client, return_value=None, side_effect=None): + """Set return value or side effect for the V1 client.""" + query_api = mock_influx_client.return_value.query + if side_effect: + query_api.side_effect = side_effect + + else: + if return_value is None: + return_value = [] + + def get_return_value(query, **kwargs): + """Return mock for test query, return value otherwise.""" + if query == TEST_QUERY_V1: + return MagicMock() + + query_output = MagicMock() + query_output.get_points.return_value = return_value + return query_output + + query_api.side_effect = get_return_value + + +def _set_query_mock_v2(mock_influx_client, return_value=None, side_effect=None): + """Set return value or side effect for the V2 client.""" + query_api = mock_influx_client.return_value.query_api.return_value.query + if side_effect: + query_api.side_effect = side_effect + else: + if return_value is None: + return_value = [] + + query_api.return_value = return_value + + +async def _setup(hass, config_ext, queries, expected_sensors): + """Create client and test expected sensors.""" + config = { + DOMAIN: {}, + sensor.DOMAIN: {"platform": DOMAIN}, + } + influx_config = config[sensor.DOMAIN] + influx_config.update(config_ext) + influx_config.update(queries) + + assert await async_setup_component(hass, sensor.DOMAIN, config) + await hass.async_block_till_done() + + sensors = [] + for expected_sensor in expected_sensors: + state = hass.states.get(expected_sensor) + assert state is not None + sensors.append(state) + + return sensors + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries", + [ + (DEFAULT_API_VERSION, BASE_V1_CONFIG, BASE_V1_QUERY), + (API_VERSION_2, BASE_V2_CONFIG, BASE_V2_QUERY), + ], + indirect=["mock_client"], +) +async def test_minimal_config(hass, mock_client, config_ext, queries): + """Test the minimal config and defaults.""" + await _setup(hass, config_ext, queries, ["sensor.test"]) + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries", + [ + ( + DEFAULT_API_VERSION, + { + "ssl": "true", + "host": "host", + "port": "9000", + "path": "path", + "username": "user", + "password": "pass", + "database": "db", + "verify_ssl": "true", + "queries": [ + { + "name": "test", + "unit_of_measurement": "unit", + "measurement": "measurement", + "where": "where", + "value_template": "value", + "database": "db2", + "group_function": "fn", + "field": "field", + } + ], + }, + {}, + ), + ( + API_VERSION_2, + { + "api_version": "2", + "ssl": "true", + "host": "host", + "port": "9000", + "path": "path", + "token": "token", + "organization": "org", + "bucket": "bucket", + "queries_flux": [ + { + "name": "test", + "unit_of_measurement": "unit", + "range_start": "start", + "range_stop": "end", + "group_function": "fn", + "bucket": "bucket2", + "imports": "import", + "query": "query", + } + ], + }, + {}, + ), + ], + indirect=["mock_client"], +) +async def test_full_config(hass, mock_client, config_ext, queries): + """Test the full config.""" + await _setup(hass, config_ext, queries, ["sensor.test"]) + + +@pytest.mark.parametrize("config_ext", [(BASE_V1_CONFIG), (BASE_V2_CONFIG)]) +async def test_config_failure(hass, config_ext): + """Test an invalid config.""" + config = {"platform": DOMAIN} + config.update(config_ext) + + with pytest.raises(Invalid): + PLATFORM_SCHEMA(config) + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries, set_query_mock, make_resultset", + [ + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + BASE_V1_QUERY, + _set_query_mock_v1, + _make_v1_resultset, + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + BASE_V2_QUERY, + _set_query_mock_v2, + _make_v2_resultset, + ), + ], + indirect=["mock_client"], +) +async def test_state_matches_query_result( + hass, mock_client, config_ext, queries, set_query_mock, make_resultset +): + """Test state of sensor matches respone from query api.""" + set_query_mock(mock_client, return_value=make_resultset(42)) + + sensors = await _setup(hass, config_ext, queries, ["sensor.test"]) + + assert sensors[0].state == "42" + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries, set_query_mock, make_resultset", + [ + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + BASE_V1_QUERY, + _set_query_mock_v1, + _make_v1_resultset, + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + BASE_V2_QUERY, + _set_query_mock_v2, + _make_v2_resultset, + ), + ], + indirect=["mock_client"], +) +async def test_state_matches_first_query_result_for_multiple_return( + hass, caplog, mock_client, config_ext, queries, set_query_mock, make_resultset +): + """Test state of sensor matches respone from query api.""" + set_query_mock(mock_client, return_value=make_resultset(42, "not used")) + + sensors = await _setup(hass, config_ext, queries, ["sensor.test"]) + assert sensors[0].state == "42" + assert ( + len([record for record in caplog.records if record.levelname == "WARNING"]) == 1 + ) + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries, set_query_mock", + [ + (DEFAULT_API_VERSION, BASE_V1_CONFIG, BASE_V1_QUERY, _set_query_mock_v1,), + (API_VERSION_2, BASE_V2_CONFIG, BASE_V2_QUERY, _set_query_mock_v2), + ], + indirect=["mock_client"], +) +async def test_state_for_no_results( + hass, caplog, mock_client, config_ext, queries, set_query_mock +): + """Test state of sensor matches respone from query api.""" + set_query_mock(mock_client) + + sensors = await _setup(hass, config_ext, queries, ["sensor.test"]) + assert sensors[0].state == STATE_UNKNOWN + assert ( + len([record for record in caplog.records if record.levelname == "WARNING"]) == 1 + ) + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries, set_query_mock, query_exception", + [ + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + BASE_V1_QUERY, + _set_query_mock_v1, + OSError("fail"), + ), + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + BASE_V1_QUERY, + _set_query_mock_v1, + InfluxDBClientError("fail"), + ), + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + BASE_V1_QUERY, + _set_query_mock_v1, + InfluxDBClientError("fail", code=400), + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + BASE_V2_QUERY, + _set_query_mock_v2, + OSError("fail"), + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + BASE_V2_QUERY, + _set_query_mock_v2, + ApiException(), + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + BASE_V2_QUERY, + _set_query_mock_v2, + ApiException(status=400), + ), + ], + indirect=["mock_client"], +) +async def test_error_querying_influx( + hass, caplog, mock_client, config_ext, queries, set_query_mock, query_exception +): + """Test behavior of sensor when influx returns error.""" + + def mock_query_error(query, **kwargs): + """Throw error for any query besides test query.""" + if query in [TEST_QUERY_V1, TEST_QUERY_V2]: + return MagicMock() + raise query_exception + + set_query_mock(mock_client, side_effect=mock_query_error) + + sensors = await _setup(hass, config_ext, queries, ["sensor.test"]) + assert sensors[0].state == STATE_UNKNOWN + assert ( + len([record for record in caplog.records if record.levelname == "ERROR"]) == 1 + ) + + +@pytest.mark.parametrize( + "mock_client, config_ext, queries, set_query_mock, make_resultset", + [ + ( + DEFAULT_API_VERSION, + BASE_V1_CONFIG, + { + "queries": [ + { + "name": "test", + "measurement": "measurement", + "where": "{{ illegal.template }}", + "field": "field", + } + ] + }, + _set_query_mock_v1, + _make_v1_resultset, + ), + ( + API_VERSION_2, + BASE_V2_CONFIG, + {"queries_flux": [{"name": "test", "query": "{{ illegal.template }}"}]}, + _set_query_mock_v2, + _make_v2_resultset, + ), + ], + indirect=["mock_client"], +) +async def test_error_rendering_template( + hass, caplog, mock_client, config_ext, queries, set_query_mock, make_resultset +): + """Test behavior of sensor with error rendering template.""" + set_query_mock(mock_client, return_value=make_resultset(42)) + + sensors = await _setup(hass, config_ext, queries, ["sensor.test"]) + assert sensors[0].state == STATE_UNKNOWN + assert ( + len([record for record in caplog.records if record.levelname == "ERROR"]) == 1 + )