Add config flow to nzbget (#38938)

* work on config flow

* Update test_init.py

* work on config flow

* Update test_config_flow.py

* Update test_config_flow.py

* Update __init__.py

* Update test_config_flow.py

* Update __init__.py

* Update test_config_flow.py

* Update test_config_flow.py

* Update test_config_flow.py

* Update test_config_flow.py

* Update test_config_flow.py

* Update test_config_flow.py

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Apply suggestions from code review

Co-authored-by: J. Nick Koston <nick@koston.org>

* Update __init__.py

* Update __init__.py

* Update __init__.py

* Update config_flow.py

* Update __init__.py

* Update __init__.py

* Create coordinator.py

* Update __init__.py

* Update sensor.py

* Update __init__.py

* Update .coveragerc

* Update coordinator.py

* Update __init__.py

* Update coordinator.py

* Update __init__.py

* Update coordinator.py

* Update config_flow.py

* Update __init__.py

* Update coordinator.py

* Update __init__.py

* Update test_config_flow.py

* Update coordinator.py

* Update test_config_flow.py

* Update test_init.py

* Update homeassistant/components/nzbget/coordinator.py

* Update test_config_flow.py

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Chris Talkington 2020-08-29 16:47:00 -05:00 committed by GitHub
parent 54ef16f01a
commit 7469f57a7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 852 additions and 212 deletions

View File

@ -589,7 +589,7 @@ omit =
homeassistant/components/nuki/lock.py
homeassistant/components/nut/sensor.py
homeassistant/components/nx584/alarm_control_panel.py
homeassistant/components/nzbget/__init__.py
homeassistant/components/nzbget/coordinator.py
homeassistant/components/nzbget/sensor.py
homeassistant/components/obihai/*
homeassistant/components/octoprint/*

View File

@ -1,10 +1,10 @@
"""The nzbget component."""
from datetime import timedelta
"""The NZBGet integration."""
import asyncio
import logging
import pynzbgetapi
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
@ -14,31 +14,30 @@ from homeassistant.const import (
CONF_SSL,
CONF_USERNAME,
)
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import track_time_interval
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import HomeAssistantType
from .const import (
ATTR_SPEED,
DATA_COORDINATOR,
DATA_UNDO_UPDATE_LISTENER,
DEFAULT_NAME,
DEFAULT_PORT,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SPEED_LIMIT,
DEFAULT_SSL,
DOMAIN,
SERVICE_PAUSE,
SERVICE_RESUME,
SERVICE_SET_SPEED,
)
from .coordinator import NZBGetDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
ATTR_SPEED = "speed"
DOMAIN = "nzbget"
DATA_NZBGET = "data_nzbget"
DATA_UPDATED = "nzbget_data_updated"
DEFAULT_NAME = "NZBGet"
DEFAULT_PORT = 6789
DEFAULT_SPEED_LIMIT = 1000 # 1 Megabyte/Sec
DEFAULT_SCAN_INTERVAL = timedelta(seconds=5)
SERVICE_PAUSE = "pause"
SERVICE_RESUME = "resume"
SERVICE_SET_SPEED = "set_speed"
SPEED_LIMIT_SCHEMA = vol.Schema(
{vol.Optional(ATTR_SPEED, default=DEFAULT_SPEED_LIMIT): cv.positive_int}
)
PLATFORMS = ["sensor"]
CONFIG_SCHEMA = vol.Schema(
{
@ -52,147 +51,155 @@ CONFIG_SCHEMA = vol.Schema(
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
}
)
},
extra=vol.ALLOW_EXTRA,
)
SPEED_LIMIT_SCHEMA = vol.Schema(
{vol.Optional(ATTR_SPEED, default=DEFAULT_SPEED_LIMIT): cv.positive_int}
)
def setup(hass, config):
"""Set up the NZBGet sensors."""
host = config[DOMAIN][CONF_HOST]
port = config[DOMAIN][CONF_PORT]
ssl = "s" if config[DOMAIN][CONF_SSL] else ""
name = config[DOMAIN][CONF_NAME]
username = config[DOMAIN].get(CONF_USERNAME)
password = config[DOMAIN].get(CONF_PASSWORD)
scan_interval = config[DOMAIN][CONF_SCAN_INTERVAL]
async def async_setup(hass: HomeAssistantType, config: dict) -> bool:
"""Set up the NZBGet integration."""
hass.data.setdefault(DOMAIN, {})
try:
nzbget_api = pynzbgetapi.NZBGetAPI(host, username, password, ssl, ssl, port)
nzbget_api.version()
except pynzbgetapi.NZBGetAPIException as conn_err:
_LOGGER.error("Error setting up NZBGet API: %s", conn_err)
return False
if hass.config_entries.async_entries(DOMAIN):
return True
_LOGGER.debug("Successfully validated NZBGet API connection")
nzbget_data = hass.data[DATA_NZBGET] = NZBGetData(hass, nzbget_api)
nzbget_data.init_download_list()
nzbget_data.update()
def service_handler(service):
"""Handle service calls."""
if service.service == SERVICE_PAUSE:
nzbget_data.pause_download()
elif service.service == SERVICE_RESUME:
nzbget_data.resume_download()
elif service.service == SERVICE_SET_SPEED:
limit = service.data[ATTR_SPEED]
nzbget_data.rate(limit)
hass.services.register(
DOMAIN, SERVICE_PAUSE, service_handler, schema=vol.Schema({})
)
hass.services.register(
DOMAIN, SERVICE_RESUME, service_handler, schema=vol.Schema({})
)
hass.services.register(
DOMAIN, SERVICE_SET_SPEED, service_handler, schema=SPEED_LIMIT_SCHEMA
)
def refresh(event_time):
"""Get the latest data from NZBGet."""
nzbget_data.update()
track_time_interval(hass, refresh, scan_interval)
sensorconfig = {"client_name": name}
hass.helpers.discovery.load_platform("sensor", DOMAIN, sensorconfig, config)
if DOMAIN in config:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config[DOMAIN],
)
)
return True
class NZBGetData:
"""Get the latest data and update the states."""
def __init__(self, hass, api):
"""Initialize the NZBGet RPC API."""
self.hass = hass
self.status = None
self.available = True
self._api = api
self.downloads = None
self.completed_downloads = set()
def update(self):
"""Get the latest data from NZBGet instance."""
try:
self.status = self._api.status()
self.downloads = self._api.history()
self.check_completed_downloads()
self.available = True
dispatcher_send(self.hass, DATA_UPDATED)
except pynzbgetapi.NZBGetAPIException as err:
self.available = False
_LOGGER.error("Unable to refresh NZBGet data: %s", err)
def init_download_list(self):
"""Initialize download list."""
self.downloads = self._api.history()
self.completed_downloads = {
(x["Name"], x["Category"], x["Status"]) for x in self.downloads
async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool:
"""Set up NZBGet from a config entry."""
if not entry.options:
options = {
CONF_SCAN_INTERVAL: entry.data.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
}
hass.config_entries.async_update_entry(entry, options=options)
def check_completed_downloads(self):
"""Check history for newly completed downloads."""
coordinator = NZBGetDataUpdateCoordinator(
hass,
config=entry.data,
options=entry.options,
)
actual_completed_downloads = {
(x["Name"], x["Category"], x["Status"]) for x in self.downloads
}
await coordinator.async_refresh()
tmp_completed_downloads = list(
actual_completed_downloads.difference(self.completed_downloads)
if not coordinator.last_update_success:
raise ConfigEntryNotReady
undo_listener = entry.add_update_listener(_async_update_listener)
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
DATA_UNDO_UPDATE_LISTENER: undo_listener,
}
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)
for download in tmp_completed_downloads:
self.hass.bus.fire(
"nzbget_download_complete",
{"name": download[0], "category": download[1], "status": download[2]},
)
_async_register_services(hass, coordinator)
self.completed_downloads = actual_completed_downloads
return True
def pause_download(self):
"""Pause download queue."""
try:
self._api.pausedownload()
except pynzbgetapi.NZBGetAPIException as err:
_LOGGER.error("Unable to pause queue: %s", err)
async def async_unload_entry(hass: HomeAssistantType, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, component)
for component in PLATFORMS
]
)
)
def resume_download(self):
"""Resume download queue."""
if unload_ok:
hass.data[DOMAIN][entry.entry_id][DATA_UNDO_UPDATE_LISTENER]()
hass.data[DOMAIN].pop(entry.entry_id)
try:
self._api.resumedownload()
except pynzbgetapi.NZBGetAPIException as err:
_LOGGER.error("Unable to resume download queue: %s", err)
return unload_ok
def rate(self, limit):
"""Set download speed."""
try:
if not self._api.rate(limit):
_LOGGER.error("Limit was out of range")
except pynzbgetapi.NZBGetAPIException as err:
_LOGGER.error("Unable to set download speed: %s", err)
def _async_register_services(
hass: HomeAssistantType,
coordinator: NZBGetDataUpdateCoordinator,
) -> None:
"""Register integration-level services."""
def pause(call) -> None:
"""Service call to pause downloads in NZBGet."""
coordinator.nzbget.pausedownload()
def resume(call) -> None:
"""Service call to resume downloads in NZBGet."""
coordinator.nzbget.resumedownload()
def set_speed(call) -> None:
"""Service call to rate limit speeds in NZBGet."""
coordinator.nzbget.rate(call.data[ATTR_SPEED])
hass.services.async_register(DOMAIN, SERVICE_PAUSE, pause, schema=vol.Schema({}))
hass.services.async_register(DOMAIN, SERVICE_RESUME, resume, schema=vol.Schema({}))
hass.services.async_register(
DOMAIN, SERVICE_SET_SPEED, set_speed, schema=SPEED_LIMIT_SCHEMA
)
async def _async_update_listener(hass: HomeAssistantType, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)
class NZBGetEntity(Entity):
"""Defines a base NZBGet entity."""
def __init__(
self, *, entry_id: str, name: str, coordinator: NZBGetDataUpdateCoordinator
) -> None:
"""Initialize the NZBGet entity."""
self._name = name
self._entry_id = entry_id
self.coordinator = coordinator
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self.coordinator.last_update_success
@property
def name(self) -> str:
"""Return the name of the entity."""
return self._name
@property
def should_poll(self) -> bool:
"""Return the polling requirement of the entity."""
return False
async def async_added_to_hass(self) -> None:
"""Connect to dispatcher listening for entity data notifications."""
self.async_on_remove(
self.coordinator.async_add_listener(self.async_write_ha_state)
)
async def async_update(self) -> None:
"""Request an update from the coordinator of this entity."""
await self.coordinator.async_request_refresh()

View File

@ -0,0 +1,143 @@
"""Config flow for NZBGet."""
import logging
from typing import Any, Dict, Optional
import voluptuous as vol
from homeassistant.config_entries import CONN_CLASS_LOCAL_POLL, ConfigFlow, OptionsFlow
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.core import callback
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from .const import (
DEFAULT_NAME,
DEFAULT_PORT,
DEFAULT_SCAN_INTERVAL,
DEFAULT_SSL,
DEFAULT_VERIFY_SSL,
)
from .const import DOMAIN # pylint: disable=unused-import
from .coordinator import NZBGetAPI, NZBGetAPIException
_LOGGER = logging.getLogger(__name__)
def validate_input(hass: HomeAssistantType, data: dict) -> Dict[str, Any]:
"""Validate the user input allows us to connect.
Data has the keys from DATA_SCHEMA with values provided by the user.
"""
nzbget_api = NZBGetAPI(
data[CONF_HOST],
data[CONF_USERNAME] if data[CONF_USERNAME] != "" else None,
data[CONF_PASSWORD] if data[CONF_PASSWORD] != "" else None,
data[CONF_SSL],
data[CONF_VERIFY_SSL],
data[CONF_PORT],
)
nzbget_api.version()
return True
class NZBGetConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for NZBGet."""
VERSION = 1
CONNECTION_CLASS = CONN_CLASS_LOCAL_POLL
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return NZBGetOptionsFlowHandler(config_entry)
async def async_step_import(
self, user_input: Optional[ConfigType] = None
) -> Dict[str, Any]:
"""Handle a flow initiated by configuration file."""
if CONF_SCAN_INTERVAL in user_input:
user_input[CONF_SCAN_INTERVAL] = user_input[CONF_SCAN_INTERVAL].seconds
return await self.async_step_user(user_input)
async def async_step_user(
self, user_input: Optional[ConfigType] = None
) -> Dict[str, Any]:
"""Handle a flow initiated by the user."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
if user_input is None:
return self._show_setup_form()
if CONF_VERIFY_SSL not in user_input:
user_input[CONF_VERIFY_SSL] = DEFAULT_VERIFY_SSL
try:
await self.hass.async_add_executor_job(
validate_input, self.hass, user_input
)
except NZBGetAPIException:
return self._show_setup_form({"base": "cannot_connect"})
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
return self.async_abort(reason="unknown")
return self.async_create_entry(title=user_input[CONF_HOST], data=user_input)
def _show_setup_form(self, errors: Optional[Dict] = None) -> Dict[str, Any]:
"""Show the setup form to the user."""
data_schema = {
vol.Required(CONF_HOST): str,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): str,
vol.Optional(CONF_USERNAME): str,
vol.Optional(CONF_PASSWORD): str,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): int,
vol.Optional(CONF_SSL, default=DEFAULT_SSL): bool,
}
if self.show_advanced_options:
data_schema[
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL)
] = bool
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(data_schema),
errors=errors or {},
)
class NZBGetOptionsFlowHandler(OptionsFlow):
"""Handle NZBGet client options."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input: Optional[ConfigType] = None):
"""Manage NZBGet options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
options = {
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
): int,
}
return self.async_show_form(step_id="init", data_schema=vol.Schema(options))

View File

@ -0,0 +1,22 @@
"""Constants for NZBGet."""
DOMAIN = "nzbget"
# Attributes
ATTR_SPEED = "speed"
# Data
DATA_COORDINATOR = "corrdinator"
DATA_UNDO_UPDATE_LISTENER = "undo_update_listener"
# Defaults
DEFAULT_NAME = "NZBGet"
DEFAULT_PORT = 6789
DEFAULT_SCAN_INTERVAL = 5 # time in seconds
DEFAULT_SPEED_LIMIT = 1000 # 1 Megabyte/Sec
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = False
# Services
SERVICE_PAUSE = "pause"
SERVICE_RESUME = "resume"
SERVICE_SET_SPEED = "set_speed"

View File

@ -0,0 +1,94 @@
"""Provides the NZBGet DataUpdateCoordinator."""
from datetime import timedelta
import logging
from async_timeout import timeout
from pynzbgetapi import NZBGetAPI, NZBGetAPIException
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class NZBGetDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching NZBGet data."""
def __init__(self, hass: HomeAssistantType, *, config: dict, options: dict):
"""Initialize global NZBGet data updater."""
self.nzbget = NZBGetAPI(
config[CONF_HOST],
config[CONF_USERNAME] if config[CONF_USERNAME] != "" else None,
config[CONF_PASSWORD] if config[CONF_PASSWORD] != "" else None,
config[CONF_SSL],
config[CONF_VERIFY_SSL],
config[CONF_PORT],
)
self._completed_downloads_init = False
self._completed_downloads = {}
update_interval = timedelta(seconds=options[CONF_SCAN_INTERVAL])
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=update_interval,
)
def _check_completed_downloads(self, history):
"""Check history for newly completed downloads."""
actual_completed_downloads = {
(x["Name"], x["Category"], x["Status"]) for x in history
}
if self._completed_downloads_init:
tmp_completed_downloads = list(
actual_completed_downloads.difference(self._completed_downloads)
)
for download in tmp_completed_downloads:
self.hass.bus.fire(
"nzbget_download_complete",
{
"name": download[0],
"category": download[1],
"status": download[2],
},
)
self._completed_downloads = actual_completed_downloads
self._completed_downloads_init = True
async def _async_update_data(self) -> dict:
"""Fetch data from NZBGet."""
def _update_data() -> dict:
"""Fetch data from NZBGet via sync functions."""
status = self.nzbget.status()
history = self.nzbget.history()
self._check_completed_downloads(history)
return {
"status": status,
"downloads": history,
}
try:
async with timeout(4):
return await self.hass.async_add_executor_job(_update_data)
except NZBGetAPIException as error:
raise UpdateFailed(f"Invalid response from API: {error}") from error

View File

@ -3,5 +3,6 @@
"name": "NZBGet",
"documentation": "https://www.home-assistant.io/integrations/nzbget",
"requirements": ["pynzbgetapi==0.2.0"],
"codeowners": ["@chriscla"]
"codeowners": ["@chriscla"],
"config_flow": true
}

View File

@ -1,21 +1,23 @@
"""Monitor the NZBGet API."""
import logging
from typing import Callable, List, Optional
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_NAME,
DATA_MEGABYTES,
DATA_RATE_MEGABYTES_PER_SECOND,
TIME_MINUTES,
)
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import HomeAssistantType
from . import DATA_NZBGET, DATA_UPDATED
from . import NZBGetEntity
from .const import DATA_COORDINATOR, DOMAIN
from .coordinator import NZBGetDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "NZBGet"
SENSOR_TYPES = {
"article_cache": ["ArticleCacheMB", "Article Cache", DATA_MEGABYTES],
"average_download_rate": [
@ -34,90 +36,80 @@ SENSOR_TYPES = {
}
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Create NZBGet sensors."""
async def async_setup_entry(
hass: HomeAssistantType,
entry: ConfigEntry,
async_add_entities: Callable[[List[Entity], bool], None],
) -> None:
"""Set up NZBGet sensor based on a config entry."""
coordinator: NZBGetDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id][
DATA_COORDINATOR
]
sensors = []
if discovery_info is None:
return
nzbget_data = hass.data[DATA_NZBGET]
name = discovery_info["client_name"]
devices = []
for sensor_config in SENSOR_TYPES.values():
new_sensor = NZBGetSensor(
nzbget_data, sensor_config[0], name, sensor_config[1], sensor_config[2]
sensors.append(
NZBGetSensor(
coordinator,
entry.entry_id,
entry.data[CONF_NAME],
sensor_config[0],
sensor_config[1],
sensor_config[2],
)
)
devices.append(new_sensor)
add_entities(devices, True)
async_add_entities(sensors, True)
class NZBGetSensor(Entity):
class NZBGetSensor(NZBGetEntity, Entity):
"""Representation of a NZBGet sensor."""
def __init__(
self, nzbget_data, sensor_type, client_name, sensor_name, unit_of_measurement
self,
coordinator: NZBGetDataUpdateCoordinator,
entry_id: str,
entry_name: str,
sensor_type: str,
sensor_name: str,
unit_of_measurement: Optional[str] = None,
):
"""Initialize a new NZBGet sensor."""
self._name = f"{client_name} {sensor_name}"
self.type = sensor_type
self.client_name = client_name
self.nzbget_data = nzbget_data
self._state = None
self._sensor_type = sensor_type
self._unique_id = f"{entry_id}_{sensor_type}"
self._unit_of_measurement = unit_of_measurement
super().__init__(
coordinator=coordinator,
entry_id=entry_id,
name=f"{entry_name} {sensor_name}",
)
@property
def name(self):
"""Return the name of the sensor."""
return self._name
def unique_id(self) -> str:
"""Return the unique ID of the sensor."""
return self._unique_id
@property
def unit_of_measurement(self) -> str:
"""Return the unit that the state of sensor is expressed in."""
return self._unit_of_measurement
@property
def state(self):
"""Return the state of the sensor."""
return self._state
value = self.coordinator.data.status.get(self._sensor_type)
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
@property
def available(self):
"""Return whether the sensor is available."""
return self.nzbget_data.available
async def async_added_to_hass(self):
"""Handle entity which will be added."""
self.async_on_remove(
async_dispatcher_connect(
self.hass, DATA_UPDATED, self._schedule_immediate_update
)
)
@callback
def _schedule_immediate_update(self):
self.async_schedule_update_ha_state(True)
def update(self):
"""Update state of sensor."""
if self.nzbget_data.status is None:
_LOGGER.debug(
"Update of %s requested, but no status is available", self._name
)
return
value = self.nzbget_data.status.get(self.type)
if value is None:
_LOGGER.warning("Unable to locate value for %s", self.type)
return
_LOGGER.warning("Unable to locate value for %s", self._sensor_type)
return None
if "DownloadRate" in self.type and value > 0:
if "DownloadRate" in self._sensor_type and value > 0:
# Convert download rate from Bytes/s to MBytes/s
self._state = round(value / 2 ** 20, 2)
elif "UpTimeSec" in self.type and value > 0:
return round(value / 2 ** 20, 2)
if "UpTimeSec" in self._sensor_type and value > 0:
# Convert uptime from seconds to minutes
self._state = round(value / 60, 2)
else:
self._state = value
return round(value / 60, 2)
return value

View File

@ -0,0 +1,36 @@
{
"config": {
"flow_title": "NZBGet: {name}",
"step": {
"user": {
"title": "Connect to NZBGet",
"data": {
"name": "Name",
"host": "[%key:common::config_flow::data::host%]",
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]",
"port": "[%key:common::config_flow::data::port%]",
"ssl": "NZBGet uses a SSL certificate",
"verify_ssl": "NZBGet uses a proper certificate"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]"
},
"abort": {
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
}
},
"options": {
"step": {
"init": {
"data": {
"scan_interval": "Update frequency (seconds)"
}
}
}
}
}

View File

@ -124,6 +124,7 @@ FLOWS = [
"nuheat",
"nut",
"nws",
"nzbget",
"onvif",
"opentherm_gw",
"openuv",

View File

@ -736,6 +736,9 @@ pynws==1.2.1
# homeassistant.components.nx584
pynx584==0.5
# homeassistant.components.nzbget
pynzbgetapi==0.2.0
# homeassistant.components.openuv
pyopenuv==1.0.9

View File

@ -0,0 +1,119 @@
"""Tests for the NZBGet integration."""
from datetime import timedelta
from homeassistant.components.nzbget.const import DOMAIN
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_SCAN_INTERVAL,
CONF_SSL,
CONF_USERNAME,
CONF_VERIFY_SSL,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry
ENTRY_CONFIG = {
CONF_HOST: "10.10.10.30",
CONF_NAME: "NZBGetTest",
CONF_PASSWORD: "",
CONF_PORT: 6789,
CONF_SSL: False,
CONF_USERNAME: "",
CONF_VERIFY_SSL: False,
}
USER_INPUT = {
CONF_HOST: "10.10.10.30",
CONF_NAME: "NZBGet",
CONF_PASSWORD: "",
CONF_PORT: 6789,
CONF_SSL: False,
CONF_USERNAME: "",
}
YAML_CONFIG = {
CONF_HOST: "10.10.10.30",
CONF_NAME: "GetNZBsTest",
CONF_PASSWORD: "",
CONF_PORT: 6789,
CONF_SCAN_INTERVAL: timedelta(seconds=5),
CONF_SSL: False,
CONF_USERNAME: "",
}
MOCK_VERSION = "21.0"
MOCK_STATUS = {
"ArticleCacheMB": "64",
"AverageDownloadRate": "512",
"DownloadPaused": "4",
"DownloadRate": "1000",
"DownloadedSizeMB": "256",
"FreeDiskSpaceMB": "1024",
"PostJobCount": "2",
"PostPaused": "4",
"RemainingSizeMB": "512",
"UpTimeSec": "600",
}
MOCK_HISTORY = [
{"Name": "Downloaded Item XYZ", "Category": "", "Status": "SUCCESS"},
{"Name": "Failed Item ABC", "Category": "", "Status": "FAILURE"},
]
async def init_integration(
hass,
*,
status: dict = MOCK_STATUS,
history: dict = MOCK_HISTORY,
version: str = MOCK_VERSION,
) -> MockConfigEntry:
"""Set up the NZBGet integration in Home Assistant."""
entry = MockConfigEntry(domain=DOMAIN, data=ENTRY_CONFIG)
entry.add_to_hass(hass)
with _patch_version(version), _patch_status(status), _patch_history(history):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
return entry
def _patch_async_setup(return_value=True):
return patch(
"homeassistant.components.nzbget.async_setup",
return_value=return_value,
)
def _patch_async_setup_entry(return_value=True):
return patch(
"homeassistant.components.nzbget.async_setup_entry",
return_value=return_value,
)
def _patch_history(return_value=MOCK_HISTORY):
return patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.history",
return_value=return_value,
)
def _patch_status(return_value=MOCK_STATUS):
return patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.status",
return_value=return_value,
)
def _patch_version(return_value=MOCK_VERSION):
return patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.version",
return_value=return_value,
)

View File

@ -0,0 +1,156 @@
"""Test the NZBGet config flow."""
from pynzbgetapi import NZBGetAPIException
from homeassistant.components.nzbget.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_SCAN_INTERVAL, CONF_VERIFY_SSL
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
)
from homeassistant.setup import async_setup_component
from . import (
ENTRY_CONFIG,
USER_INPUT,
_patch_async_setup,
_patch_async_setup_entry,
_patch_history,
_patch_status,
_patch_version,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_user_form(hass):
"""Test we get the user initiated form."""
await async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {}
with _patch_version(), _patch_status(), _patch_history(), _patch_async_setup() as mock_setup, _patch_async_setup_entry() as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "10.10.10.30"
assert result["data"] == {**USER_INPUT, CONF_VERIFY_SSL: False}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_user_form_show_advanced_options(hass):
"""Test we get the user initiated form with advanced options shown."""
await async_setup_component(hass, "persistent_notification", {})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER, "show_advanced_options": True}
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {}
user_input_advanced = {
**USER_INPUT,
CONF_VERIFY_SSL: True,
}
with _patch_version(), _patch_status(), _patch_history(), _patch_async_setup() as mock_setup, _patch_async_setup_entry() as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input_advanced,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "10.10.10.30"
assert result["data"] == {**USER_INPUT, CONF_VERIFY_SSL: True}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_user_form_cannot_connect(hass):
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
with patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.version",
side_effect=NZBGetAPIException(),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] == RESULT_TYPE_FORM
assert result["errors"] == {"base": "cannot_connect"}
async def test_user_form_unexpected_exception(hass):
"""Test we handle unexpected exception."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
with patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.version",
side_effect=Exception(),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "unknown"
async def test_user_form_single_instance_allowed(hass):
"""Test that configuring more than one instance is rejected."""
entry = MockConfigEntry(domain=DOMAIN, data=ENTRY_CONFIG)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data=USER_INPUT,
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "single_instance_allowed"
async def test_options_flow(hass):
"""Test updating options."""
entry = MockConfigEntry(
domain=DOMAIN,
data=ENTRY_CONFIG,
options={CONF_SCAN_INTERVAL: 5},
)
entry.add_to_hass(hass)
assert entry.options[CONF_SCAN_INTERVAL] == 5
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONF_SCAN_INTERVAL: 15},
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["data"][CONF_SCAN_INTERVAL] == 15

View File

@ -0,0 +1,66 @@
"""Test the NZBGet config flow."""
from pynzbgetapi import NZBGetAPIException
from homeassistant.components.nzbget.const import DOMAIN
from homeassistant.config_entries import (
ENTRY_STATE_LOADED,
ENTRY_STATE_NOT_LOADED,
ENTRY_STATE_SETUP_RETRY,
)
from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT
from homeassistant.setup import async_setup_component
from . import (
ENTRY_CONFIG,
YAML_CONFIG,
_patch_async_setup_entry,
_patch_history,
_patch_status,
_patch_version,
init_integration,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_import_from_yaml(hass) -> None:
"""Test import from YAML."""
with _patch_version(), _patch_status(), _patch_history(), _patch_async_setup_entry():
assert await async_setup_component(hass, DOMAIN, {DOMAIN: YAML_CONFIG})
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
assert entries[0].data[CONF_NAME] == "GetNZBsTest"
assert entries[0].data[CONF_HOST] == "10.10.10.30"
assert entries[0].data[CONF_PORT] == 6789
async def test_unload_entry(hass):
"""Test successful unload of entry."""
entry = await init_integration(hass)
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
assert entry.state == ENTRY_STATE_LOADED
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
assert entry.state == ENTRY_STATE_NOT_LOADED
assert not hass.data.get(DOMAIN)
async def test_async_setup_raises_entry_not_ready(hass):
"""Test that it throws ConfigEntryNotReady when exception occurs during setup."""
config_entry = MockConfigEntry(domain=DOMAIN, data=ENTRY_CONFIG)
config_entry.add_to_hass(hass)
with _patch_version(), patch(
"homeassistant.components.nzbget.coordinator.NZBGetAPI.status",
side_effect=NZBGetAPIException(),
):
await hass.config_entries.async_setup(config_entry.entry_id)
assert config_entry.state == ENTRY_STATE_SETUP_RETRY