(Re)Add support for multiple Pi-Holes (#27569)

* Update configuration schema to support multiple Pi-holes

* Construct sensors for each configured Pi-hole

* Ensure each Pi-hole has a unique name

* Update services to handle multiple Pi-holes

* Update tests for multiple configurations

* Refactor tests to support service testing

* Fix else-raise per pyliunt

* Per code review, add all entities in a single call

* Per code review, add the default name as default.

* Per code review, add cv.ensure_list to prevent breaking change

* Per code review, move name validation to schema

* Remove default name

* Per code review, validate api_key in schema definition

* Per code review, rename variables

* Per code review, use list comprehension

* Ensure unique slug names in config validation

* Per code review, refactor to CoroutineMock

* Fix adding sensor entities

* Per code review, refactor mock function creation

* Per code review, refactor mock function return values
This commit is contained in:
John Luetke 2019-12-12 10:43:49 -08:00 committed by Martin Hjelmare
parent 327b5c3c94
commit 7c42f4b45b
5 changed files with 268 additions and 84 deletions

View File

@ -20,7 +20,7 @@ from homeassistant.util import Throttle
from .const import (
CONF_LOCATION,
DEFAULT_HOST,
CONF_SLUG,
DEFAULT_LOCATION,
DEFAULT_NAME,
DEFAULT_SSL,
@ -29,82 +29,194 @@ from .const import (
MIN_TIME_BETWEEN_UPDATES,
SERVICE_DISABLE,
SERVICE_DISABLE_ATTR_DURATION,
SERVICE_DISABLE_ATTR_NAME,
SERVICE_ENABLE,
SERVICE_ENABLE_ATTR_NAME,
)
def ensure_unique_names_and_slugs(config):
"""Ensure that each configuration dict contains a unique `name` value."""
names = {}
slugs = {}
for conf in config:
if conf[CONF_NAME] not in names and conf[CONF_SLUG] not in slugs:
names[conf[CONF_NAME]] = conf[CONF_HOST]
slugs[conf[CONF_SLUG]] = conf[CONF_HOST]
else:
raise vol.Invalid(
"Duplicate name '{}' (or slug '{}') for '{}' (already in use by '{}'). Each configured Pi-hole must have a unique name.".format(
conf[CONF_NAME],
conf[CONF_SLUG],
conf[CONF_HOST],
names.get(conf[CONF_NAME], slugs[conf[CONF_SLUG]]),
)
)
return config
def coerce_slug(config):
"""Coerce the name of the Pi-Hole into a slug."""
config[CONF_SLUG] = cv.slugify(config[CONF_NAME])
return config
LOGGER = logging.getLogger(__name__)
PI_HOLE_SCHEMA = vol.Schema(
vol.All(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_API_KEY): cv.string,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
vol.Optional(CONF_LOCATION, default=DEFAULT_LOCATION): cv.string,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
},
coerce_slug,
)
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_API_KEY): cv.string,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
vol.Optional(CONF_LOCATION, default=DEFAULT_LOCATION): cv.string,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
}
vol.All(cv.ensure_list, [PI_HOLE_SCHEMA], ensure_unique_names_and_slugs)
)
},
extra=vol.ALLOW_EXTRA,
)
SERVICE_DISABLE_SCHEMA = vol.Schema(
{
vol.Required(SERVICE_DISABLE_ATTR_DURATION): vol.All(
cv.time_period_str, cv.positive_timedelta
)
}
)
async def async_setup(hass, config):
"""Set up the pi_hole integration."""
conf = config[DOMAIN]
name = conf[CONF_NAME]
host = conf[CONF_HOST]
use_tls = conf[CONF_SSL]
verify_tls = conf[CONF_VERIFY_SSL]
location = conf[CONF_LOCATION]
api_key = conf.get(CONF_API_KEY)
def get_data():
"""Retrive component data."""
return hass.data[DOMAIN]
LOGGER.debug("Setting up %s integration with host %s", DOMAIN, host)
def ensure_api_token(call_data):
"""Ensure the Pi-Hole to be enabled/disabled has a api_token configured."""
data = get_data()
if SERVICE_DISABLE_ATTR_NAME not in call_data:
for slug in data:
call_data[SERVICE_DISABLE_ATTR_NAME] = data[slug].name
ensure_api_token(call_data)
session = async_get_clientsession(hass, verify_tls)
pi_hole = PiHoleData(
Hole(
host, hass.loop, session, location=location, tls=use_tls, api_token=api_key
),
name,
call_data[SERVICE_DISABLE_ATTR_NAME] = None
else:
slug = cv.slugify(call_data[SERVICE_DISABLE_ATTR_NAME])
if (data[slug]).api.api_token is None:
raise vol.Invalid(
"Pi-hole '{}' must have an api_key provided in configuration to be enabled.".format(
pi_hole.name
)
)
return call_data
service_disable_schema = vol.Schema( # pylint: disable=invalid-name
vol.All(
{
vol.Required(SERVICE_DISABLE_ATTR_DURATION): vol.All(
cv.time_period_str, cv.positive_timedelta
),
vol.Optional(SERVICE_DISABLE_ATTR_NAME): vol.In(
[conf[CONF_NAME] for conf in config[DOMAIN]], msg="Unknown Pi-Hole",
),
},
ensure_api_token,
)
)
await pi_hole.async_update()
service_enable_schema = vol.Schema(
{
vol.Optional(SERVICE_ENABLE_ATTR_NAME): vol.In(
[conf[CONF_NAME] for conf in config[DOMAIN]], msg="Unknown Pi-Hole"
)
}
)
hass.data[DOMAIN] = pi_hole
hass.data[DOMAIN] = {}
async def handle_disable(call):
if api_key is None:
raise vol.Invalid("Pi-hole api_key must be provided in configuration")
for conf in config[DOMAIN]:
name = conf[CONF_NAME]
slug = conf[CONF_SLUG]
host = conf[CONF_HOST]
use_tls = conf[CONF_SSL]
verify_tls = conf[CONF_VERIFY_SSL]
location = conf[CONF_LOCATION]
api_key = conf.get(CONF_API_KEY)
LOGGER.debug("Setting up %s integration with host %s", DOMAIN, host)
session = async_get_clientsession(hass, verify_tls)
pi_hole = PiHoleData(
Hole(
host,
hass.loop,
session,
location=location,
tls=use_tls,
api_token=api_key,
),
name,
)
await pi_hole.async_update()
hass.data[DOMAIN][slug] = pi_hole
async def disable_service_handler(call):
"""Handle the service call to disable a single Pi-Hole or all configured Pi-Holes."""
duration = call.data[SERVICE_DISABLE_ATTR_DURATION].total_seconds()
name = call.data.get(SERVICE_DISABLE_ATTR_NAME)
LOGGER.debug("Disabling %s %s for %d seconds", DOMAIN, host, duration)
await pi_hole.api.disable(duration)
async def do_disable(name):
"""Disable the named Pi-Hole."""
slug = cv.slugify(name)
pi_hole = hass.data[DOMAIN][slug]
async def handle_enable(call):
if api_key is None:
raise vol.Invalid("Pi-hole api_key must be provided in configuration")
LOGGER.debug(
"Disabling Pi-hole '%s' (%s) for %d seconds",
name,
pi_hole.api.host,
duration,
)
await pi_hole.api.disable(duration)
LOGGER.debug("Enabling %s %s", DOMAIN, host)
await pi_hole.api.enable()
if name is not None:
await do_disable(name)
else:
for pi_hole in hass.data[DOMAIN].values():
await do_disable(pi_hole.name)
async def enable_service_handler(call):
"""Handle the service call to enable a single Pi-Hole or all configured Pi-Holes."""
name = call.data.get(SERVICE_ENABLE_ATTR_NAME)
async def do_enable(name):
"""Enable the named Pi-Hole."""
slug = cv.slugify(name)
pi_hole = hass.data[DOMAIN][slug]
LOGGER.debug("Enabling Pi-hole '%s' (%s)", name, pi_hole.api.host)
await pi_hole.api.enable()
if name is not None:
await do_enable(name)
else:
for pi_hole in hass.data[DOMAIN].values():
await do_enable(pi_hole.name)
hass.services.async_register(
DOMAIN, SERVICE_DISABLE, handle_disable, schema=SERVICE_DISABLE_SCHEMA
DOMAIN, SERVICE_DISABLE, disable_service_handler, schema=service_disable_schema
)
hass.services.async_register(DOMAIN, SERVICE_ENABLE, handle_enable)
hass.services.async_register(
DOMAIN, SERVICE_ENABLE, enable_service_handler, schema=service_enable_schema
)
hass.async_create_task(async_load_platform(hass, SENSOR_DOMAIN, DOMAIN, {}, config))

View File

@ -4,8 +4,8 @@ from datetime import timedelta
DOMAIN = "pi_hole"
CONF_LOCATION = "location"
CONF_SLUG = "slug"
DEFAULT_HOST = "pi.hole"
DEFAULT_LOCATION = "admin"
DEFAULT_METHOD = "GET"
DEFAULT_NAME = "Pi-Hole"
@ -13,8 +13,10 @@ DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True
SERVICE_DISABLE = "disable"
SERVICE_ENABLE = "enable"
SERVICE_DISABLE_ATTR_DURATION = "duration"
SERVICE_DISABLE_ATTR_NAME = "name"
SERVICE_ENABLE = "enable"
SERVICE_ENABLE_ATTR_NAME = SERVICE_DISABLE_ATTR_NAME
ATTR_BLOCKED_DOMAINS = "domains_blocked"

View File

@ -18,10 +18,12 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
if discovery_info is None:
return
pi_hole = hass.data[PIHOLE_DOMAIN]
sensors = []
sensors = [PiHoleSensor(pi_hole, sensor_name) for sensor_name in SENSOR_LIST]
for pi_hole in hass.data[PIHOLE_DOMAIN].values():
for sensor in [
PiHoleSensor(pi_hole, sensor_name) for sensor_name in SENSOR_LIST
]:
sensors.append(sensor)
async_add_entities(sensors, True)

View File

@ -1,8 +1,15 @@
disable:
description: Disable Pi-hole for an amount of time
description: Disable configured Pi-hole(s) for an amount of time
fields:
duration:
description: Time that the Pi-hole should be disabled for
example: "00:00:15"
name:
description: "[Optional] When multiple Pi-holes are configured, the name of the one to disable. If omitted, all configured Pi-holes will be disabled."
example: "Pi-Hole"
enable:
description: Enable Pi-hole
description: Enable configured Pi-hole(s)
fields:
name:
description: "[Optional] When multiple Pi-holes are configured, the name of the one to enable. If omitted, all configured Pi-holes will be enabled."
example: "Pi-Hole"

View File

@ -3,39 +3,34 @@
from unittest.mock import patch
from asynctest import CoroutineMock
from hole import Hole
from homeassistant.components import pi_hole
from tests.common import async_setup_component
def mock_pihole_data_call(Hole):
"""Need to override so as to allow mocked data."""
Hole.__init__ = (
lambda self, host, loop, session, location, tls, verify_tls=True, api_token=None: None
)
Hole.data = {
"ads_blocked_today": 0,
"ads_percentage_today": 0,
"clients_ever_seen": 0,
"dns_queries_today": 0,
"domains_being_blocked": 0,
"queries_cached": 0,
"queries_forwarded": 0,
"status": 0,
"unique_clients": 0,
"unique_domains": 0,
}
pass
ZERO_DATA = {
"ads_blocked_today": 0,
"ads_percentage_today": 0,
"clients_ever_seen": 0,
"dns_queries_today": 0,
"domains_being_blocked": 0,
"queries_cached": 0,
"queries_forwarded": 0,
"status": 0,
"unique_clients": 0,
"unique_domains": 0,
}
async def test_setup_no_config(hass):
"""Tests component setup with no config."""
with patch.object(
Hole, "get_data", new=CoroutineMock(side_effect=mock_pihole_data_call(Hole))
):
assert await async_setup_component(hass, pi_hole.DOMAIN, {pi_hole.DOMAIN: {}})
async def test_setup_minimal_config(hass):
"""Tests component setup with minimal config."""
with patch("homeassistant.components.pi_hole.Hole") as _hole:
_hole.return_value.get_data = CoroutineMock(return_value=None)
_hole.return_value.data = ZERO_DATA
assert await async_setup_component(
hass, pi_hole.DOMAIN, {pi_hole.DOMAIN: [{"host": "pi.hole"}]}
)
await hass.async_block_till_done()
@ -84,13 +79,16 @@ async def test_setup_no_config(hass):
assert hass.states.get("sensor.pi_hole_seen_clients").state == "0"
async def test_setup_custom_config(hass):
"""Tests component setup with custom config."""
with patch.object(
Hole, "get_data", new=CoroutineMock(side_effect=mock_pihole_data_call(Hole))
):
async def test_setup_name_config(hass):
"""Tests component setup with a custom name."""
with patch("homeassistant.components.pi_hole.Hole") as _hole:
_hole.return_value.get_data = CoroutineMock(return_value=None)
_hole.return_value.data = ZERO_DATA
assert await async_setup_component(
hass, pi_hole.DOMAIN, {pi_hole.DOMAIN: {"name": "Custom"}}
hass,
pi_hole.DOMAIN,
{pi_hole.DOMAIN: [{"host": "pi.hole", "name": "Custom"}]},
)
await hass.async_block_till_done()
@ -99,3 +97,66 @@ async def test_setup_custom_config(hass):
hass.states.get("sensor.custom_ads_blocked_today").name
== "Custom Ads Blocked Today"
)
async def test_disable_service_call(hass):
"""Test disable service call with no Pi-hole named."""
with patch("homeassistant.components.pi_hole.Hole") as _hole:
mock_disable = CoroutineMock(return_value=None)
_hole.return_value.disable = mock_disable
_hole.return_value.get_data = CoroutineMock(return_value=None)
_hole.return_value.data = ZERO_DATA
assert await async_setup_component(
hass,
pi_hole.DOMAIN,
{
pi_hole.DOMAIN: [
{"host": "pi.hole", "api_key": "1"},
{"host": "pi.hole", "name": "Custom", "api_key": "2"},
]
},
)
await hass.async_block_till_done()
await hass.services.async_call(
pi_hole.DOMAIN,
pi_hole.SERVICE_DISABLE,
{pi_hole.SERVICE_DISABLE_ATTR_DURATION: "00:00:01"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_disable.call_count == 2
async def test_enable_service_call(hass):
"""Test enable service call with no Pi-hole named."""
with patch("homeassistant.components.pi_hole.Hole") as _hole:
mock_enable = CoroutineMock(return_value=None)
_hole.return_value.enable = mock_enable
_hole.return_value.get_data = CoroutineMock(return_value=None)
_hole.return_value.data = ZERO_DATA
assert await async_setup_component(
hass,
pi_hole.DOMAIN,
{
pi_hole.DOMAIN: [
{"host": "pi.hole", "api_key": "1"},
{"host": "pi.hole", "name": "Custom", "api_key": "2"},
]
},
)
await hass.async_block_till_done()
await hass.services.async_call(
pi_hole.DOMAIN, pi_hole.SERVICE_ENABLE, {}, blocking=True
)
await hass.async_block_till_done()
assert mock_enable.call_count == 2