Implement config flow for SQL integration (#68700)

This commit is contained in:
G Johansson 2022-04-24 20:50:32 +02:00 committed by GitHub
parent 776565c23f
commit 472ffd3bc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 962 additions and 208 deletions

View File

@ -950,8 +950,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/splunk/ @Bre77
/homeassistant/components/spotify/ @frenck
/tests/components/spotify/ @frenck
/homeassistant/components/sql/ @dgomes
/tests/components/sql/ @dgomes
/homeassistant/components/sql/ @dgomes @gjohansson-ST
/tests/components/sql/ @dgomes @gjohansson-ST
/homeassistant/components/squeezebox/ @rajlaud
/tests/components/squeezebox/ @rajlaud
/homeassistant/components/srp_energy/ @briglx
@ -1198,4 +1198,4 @@ build.json @home-assistant/supervisor
/homeassistant/components/demo/weather.py @fabaff
# Remove codeowners from files
/homeassistant/components/*/translations/
/homeassistant/components/*/translations/

View File

@ -1 +1,21 @@
"""The sql component."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import PLATFORMS
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up SQL from a config entry."""
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload SQL config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@ -0,0 +1,215 @@
"""Adds config flow for SQL integration."""
from __future__ import annotations
import logging
from typing import Any
import sqlalchemy
from sqlalchemy.engine import Result
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.recorder import CONF_DB_URL, DEFAULT_DB_FILE, DEFAULT_URL
from homeassistant.const import CONF_NAME, CONF_UNIT_OF_MEASUREMENT, CONF_VALUE_TEMPLATE
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import selector
from .const import CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
_LOGGER = logging.getLogger(__name__)
DATA_SCHEMA = vol.Schema(
{
vol.Optional(CONF_DB_URL): selector.TextSelector(selector.TextSelectorConfig()),
vol.Required(CONF_COLUMN_NAME): selector.TextSelector(
selector.TextSelectorConfig()
),
vol.Required(CONF_QUERY): selector.TextSelector(selector.TextSelectorConfig()),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): selector.TextSelector(
selector.TextSelectorConfig()
),
vol.Optional(CONF_VALUE_TEMPLATE): selector.TemplateSelector(
selector.TemplateSelectorConfig()
),
}
)
def validate_sql_select(value: str) -> str | None:
"""Validate that value is a SQL SELECT query."""
if not value.lstrip().lower().startswith("select"):
raise ValueError("Incorrect Query")
return value
def validate_query(db_url: str, query: str, column: str) -> bool:
"""Validate SQL query."""
try:
engine = sqlalchemy.create_engine(db_url)
sessmaker = scoped_session(sessionmaker(bind=engine))
except SQLAlchemyError as error:
raise error
sess: scoped_session = sessmaker()
try:
result: Result = sess.execute(query)
for res in result.mappings():
data = res[column]
_LOGGER.debug("Return value from query: %s", data)
except SQLAlchemyError as error:
if sess:
sess.close()
raise ValueError(error) from error
if sess:
sess.close()
return True
class SQLConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for SQL integration."""
VERSION = 1
entry: config_entries.ConfigEntry
hass: HomeAssistant
@staticmethod
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> SQLOptionsFlowHandler:
"""Get the options flow for this handler."""
return SQLOptionsFlowHandler(config_entry)
async def async_step_import(self, config: dict[str, Any] | None) -> FlowResult:
"""Import a configuration from config.yaml."""
self._async_abort_entries_match(config)
return await self.async_step_user(user_input=config)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the user step."""
errors = {}
db_url_default = DEFAULT_URL.format(
hass_config_path=self.hass.config.path(DEFAULT_DB_FILE)
)
if user_input is not None:
db_url = user_input.get(CONF_DB_URL, db_url_default)
query = user_input[CONF_QUERY]
column = user_input[CONF_COLUMN_NAME]
uom = user_input.get(CONF_UNIT_OF_MEASUREMENT)
value_template = user_input.get(CONF_VALUE_TEMPLATE)
name = f"Select {column} SQL query"
try:
validate_sql_select(query)
await self.hass.async_add_executor_job(
validate_query, db_url, query, column
)
except SQLAlchemyError:
errors["db_url"] = "db_url_invalid"
except ValueError:
errors["query"] = "query_invalid"
if not errors:
return self.async_create_entry(
title=name,
data={},
options={
CONF_DB_URL: db_url,
CONF_QUERY: query,
CONF_COLUMN_NAME: column,
CONF_UNIT_OF_MEASUREMENT: uom,
CONF_VALUE_TEMPLATE: value_template,
CONF_NAME: name,
},
)
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
errors=errors,
)
class SQLOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle SQL options."""
def __init__(self, entry: config_entries.ConfigEntry) -> None:
"""Initialize SQL options flow."""
self.entry = entry
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Manage SQL options."""
errors = {}
if user_input is not None:
db_url = user_input[CONF_DB_URL]
query = user_input[CONF_QUERY]
column = user_input[CONF_COLUMN_NAME]
try:
validate_sql_select(query)
await self.hass.async_add_executor_job(
validate_query, db_url, query, column
)
except SQLAlchemyError:
errors["db_url"] = "db_url_invalid"
except ValueError:
errors["query"] = "query_invalid"
else:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Required(
CONF_DB_URL,
description={
"suggested_value": self.entry.options[CONF_DB_URL]
},
): selector.selector({"text": {}}),
vol.Required(
CONF_QUERY,
description={"suggested_value": self.entry.options[CONF_QUERY]},
): selector.selector({"text": {}}),
vol.Required(
CONF_COLUMN_NAME,
description={
"suggested_value": self.entry.options[CONF_COLUMN_NAME]
},
): selector.selector({"text": {}}),
vol.Optional(
CONF_UNIT_OF_MEASUREMENT,
description={
"suggested_value": self.entry.options.get(
CONF_UNIT_OF_MEASUREMENT
)
},
): selector.selector({"text": {}}),
vol.Optional(
CONF_VALUE_TEMPLATE,
description={
"suggested_value": self.entry.options.get(
CONF_VALUE_TEMPLATE
)
},
): selector.selector({"text": {}}),
}
),
errors=errors,
)

View File

@ -0,0 +1,12 @@
"""Adds constants for SQL integration."""
import re
from homeassistant.const import Platform
DOMAIN = "sql"
PLATFORMS = [Platform.SENSOR]
CONF_COLUMN_NAME = "column"
CONF_QUERIES = "queries"
CONF_QUERY = "query"
DB_URL_RE = re.compile("//.*:.*@")

View File

@ -3,6 +3,7 @@
"name": "SQL",
"documentation": "https://www.home-assistant.io/integrations/sql",
"requirements": ["sqlalchemy==1.4.35"],
"codeowners": ["@dgomes"],
"codeowners": ["@dgomes", "@gjohansson-ST"],
"config_flow": true,
"iot_class": "local_polling"
}

View File

@ -4,9 +4,9 @@ from __future__ import annotations
from datetime import date
import decimal
import logging
import re
import sqlalchemy
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
import voluptuous as vol
@ -15,39 +15,32 @@ from homeassistant.components.sensor import (
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import CONF_NAME, CONF_UNIT_OF_MEASUREMENT, CONF_VALUE_TEMPLATE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntryType
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import CONF_COLUMN_NAME, CONF_QUERIES, CONF_QUERY, DB_URL_RE, DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_COLUMN_NAME = "column"
CONF_QUERIES = "queries"
CONF_QUERY = "query"
DB_URL_RE = re.compile("//.*:.*@")
def redact_credentials(data: str) -> str:
"""Redact credentials from string data."""
return DB_URL_RE.sub("//****:****@", data)
def validate_sql_select(value: str) -> str:
"""Validate that value is a SQL SELECT query."""
if not value.lstrip().lower().startswith("select"):
raise vol.Invalid("Only SELECT queries allowed")
return value
_QUERY_SCHEME = vol.Schema(
{
vol.Required(CONF_COLUMN_NAME): cv.string,
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_QUERY): vol.All(cv.string, validate_sql_select),
vol.Required(CONF_QUERY): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
}
@ -58,67 +51,101 @@ PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the SQL sensor platform."""
if not (db_url := config.get(CONF_DB_URL)):
db_url = DEFAULT_URL.format(hass_config_path=hass.config.path(DEFAULT_DB_FILE))
_LOGGER.warning(
# SQL config flow added in 2022.4 and should be removed in 2022.6
"Configuration of the SQL sensor platform in YAML is deprecated and "
"will be removed in Home Assistant 2022.6; Your existing configuration "
"has been imported into the UI automatically and can be safely removed "
"from your configuration.yaml file"
)
sess: scoped_session | None = None
try:
engine = sqlalchemy.create_engine(db_url)
sessmaker = scoped_session(sessionmaker(bind=engine))
# Run a dummy query just to test the db_url
sess = sessmaker()
sess.execute("SELECT 1;")
except sqlalchemy.exc.SQLAlchemyError as err:
_LOGGER.error(
"Couldn't connect using %s DB_URL: %s",
redact_credentials(db_url),
redact_credentials(str(err)),
)
return
finally:
if sess:
sess.close()
queries = []
default_db_url = DEFAULT_URL.format(
hass_config_path=hass.config.path(DEFAULT_DB_FILE)
)
for query in config[CONF_QUERIES]:
name: str = query[CONF_NAME]
query_str: str = query[CONF_QUERY]
unit: str | None = query.get(CONF_UNIT_OF_MEASUREMENT)
value_template: Template | None = query.get(CONF_VALUE_TEMPLATE)
column_name: str = query[CONF_COLUMN_NAME]
new_config = {
CONF_DB_URL: config.get(CONF_DB_URL, default_db_url),
CONF_NAME: query.get(CONF_NAME),
CONF_QUERY: query.get(CONF_QUERY),
CONF_UNIT_OF_MEASUREMENT: query.get(CONF_UNIT_OF_MEASUREMENT),
CONF_VALUE_TEMPLATE: query.get(CONF_VALUE_TEMPLATE),
CONF_COLUMN_NAME: query.get(CONF_COLUMN_NAME),
}
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=new_config,
)
)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the SQL sensor entry."""
db_url: str = entry.options[CONF_DB_URL]
name: str = entry.options[CONF_NAME]
query_str: str = entry.options[CONF_QUERY]
unit: str | None = entry.options.get(CONF_UNIT_OF_MEASUREMENT)
template: str | None = entry.options.get(CONF_VALUE_TEMPLATE)
column_name: str = entry.options[CONF_COLUMN_NAME]
value_template: Template | None = None
if template is not None:
try:
value_template = Template(template)
value_template.ensure_valid()
except TemplateError:
value_template = None
if value_template is not None:
value_template.hass = hass
# MSSQL uses TOP and not LIMIT
if not ("LIMIT" in query_str.upper() or "SELECT TOP" in query_str.upper()):
query_str = (
query_str.replace("SELECT", "SELECT TOP 1")
if "mssql" in db_url
else query_str.replace(";", " LIMIT 1;")
)
try:
engine = sqlalchemy.create_engine(db_url)
sessmaker = scoped_session(sessionmaker(bind=engine))
except SQLAlchemyError as err:
_LOGGER.error("Can not open database %s", {redact_credentials(str(err))})
return
sensor = SQLSensor(
name, sessmaker, query_str, column_name, unit, value_template
# MSSQL uses TOP and not LIMIT
if not ("LIMIT" in query_str.upper() or "SELECT TOP" in query_str.upper()):
query_str = (
query_str.replace("SELECT", "SELECT TOP 1")
if "mssql" in db_url
else query_str.replace(";", " LIMIT 1;")
)
queries.append(sensor)
add_entities(queries, True)
async_add_entities(
[
SQLSensor(
name,
sessmaker,
query_str,
column_name,
unit,
value_template,
entry.entry_id,
)
],
True,
)
class SQLSensor(SensorEntity):
"""Representation of an SQL sensor."""
_attr_icon = "mdi:database-search"
def __init__(
self,
name: str,
@ -127,6 +154,7 @@ class SQLSensor(SensorEntity):
column: str,
unit: str | None,
value_template: Template | None,
entry_id: str,
) -> None:
"""Initialize the SQL sensor."""
self._attr_name = name
@ -136,6 +164,13 @@ class SQLSensor(SensorEntity):
self._column_name = column
self.sessionmaker = sessmaker
self._attr_extra_state_attributes = {}
self._attr_unique_id = entry_id
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry_id)},
manufacturer="SQL",
name=name,
)
def update(self) -> None:
"""Retrieve sensor data from the query."""
@ -145,7 +180,7 @@ class SQLSensor(SensorEntity):
sess: scoped_session = self.sessionmaker()
try:
result = sess.execute(self._query)
except sqlalchemy.exc.SQLAlchemyError as err:
except SQLAlchemyError as err:
_LOGGER.error(
"Error executing query %s: %s",
self._query,

View File

@ -0,0 +1,55 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
},
"error": {
"db_url_invalid": "Database URL invalid",
"query_invalid": "SQL Query invalid",
"value_template_invalid": "Value Template invalid"
},
"step": {
"user": {
"data": {
"db_url": "Database URL",
"query": "Select Query",
"column": "Column",
"unit_of_measurement": "Unit of Measure",
"value_template": "Value Template"
},
"data_description": {
"db_url": "Database URL, leave empty to use default HA database",
"query": "Query to run, needs to start with 'SELECT'",
"column": "Column for returned query to present as state",
"unit_of_measurement": "Unit of Measure (optional)",
"value_template": "Value Template (optional)"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"db_url": "[%key:component::sql::config::step::user::data::db_url%]",
"query": "[%key:component::sql::config::step::user::data::query%]",
"column": "[%key:component::sql::config::step::user::data::column%]",
"unit_of_measurement": "[%key:component::sql::config::step::user::data::unit_of_measurement%]",
"value_template": "[%key:component::sql::config::step::user::data::value_template%]"
},
"data_description": {
"db_url": "[%key:component::sql::config::step::user::data_description::db_url%]",
"query": "[%key:component::sql::config::step::user::data_description::query%]",
"column": "[%key:component::sql::config::step::user::data_description::column%]",
"unit_of_measurement": "[%key:component::sql::config::step::user::data_description::unit_of_measurement%]",
"value_template": "[%key:component::sql::config::step::user::data_description::value_template%]"
}
}
},
"error": {
"db_url_invalid": "[%key:component::sql::config::error::db_url_invalid%]",
"query_invalid": "[%key:component::sql::config::error::query_invalid%]",
"value_template_invalid": "[%key:component::sql::config::error::value_template_invalid%]"
}
}
}

View File

@ -0,0 +1,55 @@
{
"config": {
"abort": {
"already_configured": "Account is already configured"
},
"error": {
"db_url_invalid": "Database URL invalid",
"query_invalid": "SQL Query invalid",
"value_template_invalid": "Value Template invalid"
},
"step": {
"user": {
"data": {
"column": "Column",
"db_url": "Database URL",
"query": "Select Query",
"unit_of_measurement": "Unit of Measure",
"value_template": "Value Template"
},
"data_description": {
"db_url": "Database URL, leave empty to use default HA database",
"query": "Query to run, needs to start with 'SELECT'",
"column": "Column for returned query to present as state",
"unit_of_measurement": "Unit of Measure (optional)",
"value_template": "Value Template (optional)"
}
}
}
},
"options": {
"error": {
"db_url_invalid": "Database URL invalid",
"query_invalid": "SQL Query invalid",
"value_template_invalid": "Value Template invalid"
},
"step": {
"init": {
"data": {
"column": "Column",
"db_url": "Database URL",
"query": "Select Query",
"unit_of_measurement": "Unit of Measure",
"value_template": "Value Template"
},
"data_description": {
"db_url": "Database URL, leave empty to use default HA database",
"query": "Query to run, needs to start with 'SELECT'",
"column": "Column for returned query to present as state",
"unit_of_measurement": "Unit of Measure (optional)",
"value_template": "Value Template (optional)"
}
}
}
}
}

View File

@ -322,6 +322,7 @@ FLOWS = {
"speedtestdotnet",
"spider",
"spotify",
"sql",
"squeezebox",
"srp_energy",
"starline",

View File

@ -1 +1,58 @@
"""Tests for the sql component."""
from __future__ import annotations
from typing import Any
from homeassistant.components.recorder import CONF_DB_URL
from homeassistant.components.sql.const import CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_UNIT_OF_MEASUREMENT
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
ENTRY_CONFIG = {
CONF_DB_URL: "sqlite://",
CONF_QUERY: "SELECT 5 as value",
CONF_COLUMN_NAME: "value",
CONF_UNIT_OF_MEASUREMENT: "MiB",
}
ENTRY_CONFIG_INVALID_QUERY = {
CONF_DB_URL: "sqlite://",
CONF_QUERY: "UPDATE 5 as value",
CONF_COLUMN_NAME: "size",
CONF_UNIT_OF_MEASUREMENT: "MiB",
}
ENTRY_CONFIG_NO_RESULTS = {
CONF_DB_URL: "sqlite://",
CONF_QUERY: "SELECT kalle as value from no_table;",
CONF_COLUMN_NAME: "value",
CONF_UNIT_OF_MEASUREMENT: "MiB",
}
async def init_integration(
hass: HomeAssistant,
config: dict[str, Any] = None,
entry_id: str = "1",
source: str = SOURCE_USER,
) -> MockConfigEntry:
"""Set up the SQL integration in Home Assistant."""
if not config:
config = ENTRY_CONFIG
config_entry = MockConfigEntry(
domain=DOMAIN,
source=source,
data={},
options=config,
entry_id=entry_id,
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return config_entry

View File

@ -0,0 +1,314 @@
"""Test the SQL config flow."""
from __future__ import annotations
from unittest.mock import patch
from sqlalchemy.exc import SQLAlchemyError
from homeassistant import config_entries
from homeassistant.components.sql.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
)
from . import ENTRY_CONFIG, ENTRY_CONFIG_INVALID_QUERY, ENTRY_CONFIG_NO_RESULTS
from tests.common import MockConfigEntry
async def test_form(hass: HomeAssistant) -> None:
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {}
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
ENTRY_CONFIG,
)
await hass.async_block_till_done()
print(ENTRY_CONFIG)
assert result2["type"] == RESULT_TYPE_CREATE_ENTRY
assert result2["title"] == "Select value SQL query"
assert result2["options"] == {
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select value SQL query",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_import_flow_success(hass: HomeAssistant) -> None:
"""Test a successful import of yaml."""
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=ENTRY_CONFIG,
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_CREATE_ENTRY
assert result2["title"] == "Select value SQL query"
assert result2["options"] == {
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select value SQL query",
}
assert len(mock_setup_entry.mock_calls) == 1
async def test_import_flow_already_exist(hass: HomeAssistant) -> None:
"""Test import of yaml already exist."""
MockConfigEntry(
domain=DOMAIN,
data=ENTRY_CONFIG,
).add_to_hass(hass)
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
):
result3 = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=ENTRY_CONFIG,
)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_ABORT
assert result3["reason"] == "already_configured"
async def test_flow_fails_db_url(hass: HomeAssistant) -> None:
"""Test config flow fails incorrect db url."""
result4 = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result4["type"] == RESULT_TYPE_FORM
assert result4["step_id"] == config_entries.SOURCE_USER
with patch(
"homeassistant.components.sql.config_flow.sqlalchemy.create_engine",
side_effect=SQLAlchemyError("error_message"),
):
result4 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result4["errors"] == {"db_url": "db_url_invalid"}
async def test_flow_fails_invalid_query(hass: HomeAssistant) -> None:
"""Test config flow fails incorrect db url."""
result4 = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result4["type"] == RESULT_TYPE_FORM
assert result4["step_id"] == config_entries.SOURCE_USER
result5 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
user_input=ENTRY_CONFIG_INVALID_QUERY,
)
assert result5["type"] == RESULT_TYPE_FORM
assert result5["errors"] == {
"query": "query_invalid",
}
result5 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
user_input=ENTRY_CONFIG_NO_RESULTS,
)
assert result5["type"] == RESULT_TYPE_FORM
assert result5["errors"] == {
"query": "query_invalid",
}
result5 = await hass.config_entries.flow.async_configure(
result4["flow_id"],
user_input=ENTRY_CONFIG,
)
assert result5["type"] == RESULT_TYPE_CREATE_ENTRY
assert result5["title"] == "Select value SQL query"
assert result5["options"] == {
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select value SQL query",
}
async def test_options_flow(hass: HomeAssistant) -> None:
"""Test options config flow."""
entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select value SQL query",
},
)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
):
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"db_url": "sqlite://",
"query": "SELECT 5 as size",
"column": "size",
"unit_of_measurement": "MiB",
},
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["data"] == {
"db_url": "sqlite://",
"query": "SELECT 5 as size",
"column": "size",
"unit_of_measurement": "MiB",
}
async def test_options_flow_fails_db_url(hass: HomeAssistant) -> None:
"""Test options flow fails incorrect db url."""
entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select value SQL query",
},
)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
):
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
with patch(
"homeassistant.components.sql.config_flow.sqlalchemy.create_engine",
side_effect=SQLAlchemyError("error_message"),
):
result2 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"db_url": "sqlite://",
"query": "SELECT 5 as size",
"column": "size",
"unit_of_measurement": "MiB",
},
)
assert result2["errors"] == {"db_url": "db_url_invalid"}
async def test_options_flow_fails_invalid_query(
hass: HomeAssistant,
) -> None:
"""Test options flow fails incorrect query and template."""
entry = MockConfigEntry(
domain=DOMAIN,
data={},
options={
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"unit_of_measurement": "MiB",
"value_template": None,
"name": "Select size SQL query",
},
)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.sql.async_setup_entry",
return_value=True,
):
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
result2 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input=ENTRY_CONFIG_INVALID_QUERY,
)
assert result2["type"] == RESULT_TYPE_FORM
assert result2["errors"] == {
"query": "query_invalid",
}
result4 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"db_url": "sqlite://",
"query": "SELECT 5 as size",
"column": "size",
"unit_of_measurement": "MiB",
},
)
assert result4["type"] == RESULT_TYPE_CREATE_ENTRY
assert result4["data"] == {
"db_url": "sqlite://",
"query": "SELECT 5 as size",
"column": "size",
"unit_of_measurement": "MiB",
}

View File

@ -0,0 +1,21 @@
"""Test for SQL component Init."""
from homeassistant import config_entries
from homeassistant.core import HomeAssistant
from . import init_integration
async def test_setup_entry(hass: HomeAssistant) -> None:
"""Test setup entry."""
config_entry = await init_integration(hass)
assert config_entry.state == config_entries.ConfigEntryState.LOADED
async def test_unload_entry(hass: HomeAssistant) -> None:
"""Test unload an entry."""
config_entry = await init_integration(hass)
assert config_entry.state == config_entries.ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is config_entries.ConfigEntryState.NOT_LOADED

View File

@ -1,27 +1,37 @@
"""The test for the sql sensor platform."""
import os
from unittest.mock import patch
import pytest
import voluptuous as vol
from sqlalchemy.exc import SQLAlchemyError
from homeassistant.components.sql.sensor import validate_sql_select
from homeassistant.const import STATE_UNKNOWN
from homeassistant.components.sql.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_NAME, STATE_UNKNOWN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_component import async_update_entity
from homeassistant.setup import async_setup_component
from tests.common import get_test_config_dir
from . import init_integration
@pytest.fixture(autouse=True)
def remove_file():
"""Remove db."""
yield
file = os.path.join(get_test_config_dir(), "home-assistant_v2.db")
if os.path.isfile(file):
os.remove(file)
from tests.common import MockConfigEntry
async def test_query(hass: HomeAssistant) -> None:
"""Test the SQL sensor."""
config = {
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"name": "Select value SQL query",
}
await init_integration(hass, config)
state = hass.states.get("sensor.select_value_sql_query")
assert state.state == "5"
assert state.attributes["value"] == 5
async def test_import_query(hass: HomeAssistant) -> None:
"""Test the SQL sensor."""
config = {
"sensor": {
@ -40,77 +50,52 @@ async def test_query(hass: HomeAssistant) -> None:
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.count_tables")
assert state.state == "5"
assert state.attributes["value"] == 5
async def test_query_no_db(hass: HomeAssistant) -> None:
"""Test the SQL sensor."""
config = {
"sensor": {
"platform": "sql",
"queries": [
{
"name": "count_tables",
"query": "SELECT 5 as value",
"column": "value",
}
],
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.count_tables")
assert state.state == "5"
assert hass.config_entries.async_entries(DOMAIN)
options = hass.config_entries.async_entries(DOMAIN)[0].options
assert options[CONF_NAME] == "Select value SQL query"
async def test_query_value_template(hass: HomeAssistant) -> None:
"""Test the SQL sensor."""
config = {
"sensor": {
"platform": "sql",
"db_url": "sqlite://",
"queries": [
{
"name": "count_tables",
"query": "SELECT 5.01 as value",
"column": "value",
"value_template": "{{ value | int }}",
}
],
}
"db_url": "sqlite://",
"query": "SELECT 5.01 as value",
"column": "value",
"name": "count_tables",
"value_template": "{{ value | int }}",
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
await init_integration(hass, config)
state = hass.states.get("sensor.count_tables")
assert state.state == "5"
async def test_query_value_template_invalid(hass: HomeAssistant) -> None:
"""Test the SQL sensor."""
config = {
"db_url": "sqlite://",
"query": "SELECT 5.01 as value",
"column": "value",
"name": "count_tables",
"value_template": "{{ value | dontwork }}",
}
await init_integration(hass, config)
state = hass.states.get("sensor.count_tables")
assert state.state == "5.01"
async def test_query_limit(hass: HomeAssistant) -> None:
"""Test the SQL sensor with a query containing 'LIMIT' in lowercase."""
config = {
"sensor": {
"platform": "sql",
"db_url": "sqlite://",
"queries": [
{
"name": "count_tables",
"query": "SELECT 5 as value limit 1",
"column": "value",
}
],
}
"db_url": "sqlite://",
"query": "SELECT 5 as value limit 1",
"column": "value",
"name": "Select value SQL query",
}
await init_integration(hass, config)
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.count_tables")
state = hass.states.get("sensor.select_value_sql_query")
assert state.state == "5"
assert state.attributes["value"] == 5
@ -120,21 +105,12 @@ async def test_query_no_value(
) -> None:
"""Test the SQL sensor with a query that returns no value."""
config = {
"sensor": {
"platform": "sql",
"db_url": "sqlite://",
"queries": [
{
"name": "count_tables",
"query": "SELECT 5 as value where 1=2",
"column": "value",
}
],
}
"db_url": "sqlite://",
"query": "SELECT 5 as value where 1=2",
"column": "value",
"name": "count_tables",
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
await init_integration(hass, config)
state = hass.states.get("sensor.count_tables")
assert state.state == STATE_UNKNOWN
@ -143,56 +119,6 @@ async def test_query_no_value(
assert text in caplog.text
async def test_invalid_query(hass: HomeAssistant) -> None:
"""Test the SQL sensor for invalid queries."""
with pytest.raises(vol.Invalid):
validate_sql_select("DROP TABLE *")
config = {
"sensor": {
"platform": "sql",
"db_url": "sqlite://",
"queries": [
{
"name": "count_tables",
"query": "SELECT * value FROM sqlite_master;",
"column": "value",
}
],
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.count_tables")
assert state.state == STATE_UNKNOWN
async def test_value_float_and_date(hass: HomeAssistant) -> None:
"""Test the SQL sensor with a query has float as value."""
config = {
"sensor": {
"platform": "sql",
"db_url": "sqlite://",
"queries": [
{
"name": "float_value",
"query": "SELECT 5 as value, cast(5.01 as decimal(10,2)) as value2",
"column": "value",
},
],
}
}
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
state = hass.states.get("sensor.float_value")
assert state.state == "5"
assert isinstance(state.attributes["value2"], float)
@pytest.mark.parametrize(
"url,expected_patterns,not_expected_patterns",
[
@ -208,32 +134,74 @@ async def test_value_float_and_date(hass: HomeAssistant) -> None:
),
],
)
async def test_invalid_url(
async def test_invalid_url_setup(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
url: str,
expected_patterns: str,
not_expected_patterns: str,
):
"""Test credentials in url is not logged."""
"""Test invalid db url with redacted credentials."""
config = {
"sensor": {
"platform": "sql",
"db_url": url,
"queries": [
{
"name": "count_tables",
"query": "SELECT 5 as value",
"column": "value",
}
],
}
"db_url": url,
"query": "SELECT 5 as value",
"column": "value",
"name": "count_tables",
}
entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
data={},
options=config,
entry_id="1",
)
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
entry.add_to_hass(hass)
with patch(
"homeassistant.components.sql.sensor.sqlalchemy.create_engine",
side_effect=SQLAlchemyError(url),
):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
for pattern in not_expected_patterns:
assert pattern not in caplog.text
for pattern in expected_patterns:
assert pattern in caplog.text
async def test_invalid_url_on_update(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
):
"""Test invalid db url with redacted credentials on retry."""
config = {
"db_url": "sqlite://",
"query": "SELECT 5 as value",
"column": "value",
"name": "count_tables",
}
entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
data={},
options=config,
entry_id="1",
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
with patch(
"homeassistant.components.sql.sensor.sqlalchemy.engine.cursor.CursorResult",
side_effect=SQLAlchemyError(
"sqlite://homeassistant:hunter2@homeassistant.local"
),
):
await async_update_entity(hass, "sensor.count_tables")
assert "sqlite://homeassistant:hunter2@homeassistant.local" not in caplog.text
assert "sqlite://****:****@homeassistant.local" in caplog.text