Rework octoprint (#58040)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Ryan Fleming 2021-10-22 09:25:12 -04:00 committed by GitHub
parent 1a9ac6b657
commit c84fee7c6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1485 additions and 392 deletions

View File

@ -735,7 +735,7 @@ omit =
homeassistant/components/nx584/alarm_control_panel.py
homeassistant/components/nzbget/coordinator.py
homeassistant/components/obihai/*
homeassistant/components/octoprint/*
homeassistant/components/octoprint/__init__.py
homeassistant/components/oem/climate.py
homeassistant/components/oasa_telematics/sensor.py
homeassistant/components/ohmconnect/sensor.py

View File

@ -366,6 +366,7 @@ homeassistant/components/nut/* @bdraco @ollo69
homeassistant/components/nws/* @MatthewFlamm
homeassistant/components/nzbget/* @chriscla
homeassistant/components/obihai/* @dshokouhi
homeassistant/components/octoprint/* @rfleming71
homeassistant/components/ohmconnect/* @robbiet480
homeassistant/components/ombi/* @larssont
homeassistant/components/omnilogic/* @oliver84 @djtimca @gentoosu

View File

@ -1,11 +1,11 @@
"""Support for monitoring OctoPrint 3D printers."""
from datetime import timedelta
import logging
import time
from aiohttp.hdrs import CONTENT_TYPE
import requests
from pyoctoprintapi import ApiError, OctoprintClient, PrinterOffline
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_API_KEY,
CONF_BINARY_SENSORS,
@ -16,23 +16,18 @@ from homeassistant.const import (
CONF_PORT,
CONF_SENSORS,
CONF_SSL,
CONTENT_TYPE_JSON,
PERCENTAGE,
TEMP_CELSIUS,
TIME_SECONDS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import slugify as util_slugify
import homeassistant.util.dt as dt_util
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_BED = "bed"
CONF_NUMBER_OF_TOOLS = "number_of_tools"
DEFAULT_NAME = "OctoPrint"
DOMAIN = "octoprint"
def has_all_unique_names(value):
"""Validate that printers have an unique name."""
@ -51,11 +46,15 @@ def ensure_valid_path(value):
return value
BINARY_SENSOR_TYPES = {
# API Endpoint, Group, Key, unit
"Printing": ["printer", "state", "printing", None],
"Printing Error": ["printer", "state", "error", None],
}
PLATFORMS = ["binary_sensor", "sensor"]
DEFAULT_NAME = "Octoprint"
CONF_NUMBER_OF_TOOLS = "number_of_tools"
CONF_BED = "bed"
BINARY_SENSOR_TYPES = [
"Printing",
"Printing Error",
]
BINARY_SENSOR_SCHEMA = vol.Schema(
{
@ -66,26 +65,13 @@ BINARY_SENSOR_SCHEMA = vol.Schema(
}
)
SENSOR_TYPES = {
# API Endpoint, Group, Key, unit, icon
"Temperatures": ["printer", "temperature", "*", TEMP_CELSIUS],
"Current State": ["printer", "state", "text", None, "mdi:printer-3d"],
"Job Percentage": [
"job",
"progress",
"completion",
PERCENTAGE,
"mdi:file-percent",
],
"Time Remaining": [
"job",
"progress",
"printTimeLeft",
TIME_SECONDS,
"mdi:clock-end",
],
"Time Elapsed": ["job", "progress", "printTime", TIME_SECONDS, "mdi:clock-start"],
}
SENSOR_TYPES = [
"Temperatures",
"Current State",
"Job Percentage",
"Time Remaining",
"Time Elapsed",
]
SENSOR_SCHEMA = vol.Schema(
{
@ -97,207 +83,145 @@ SENSOR_SCHEMA = vol.Schema(
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_PORT, default=80): cv.port,
vol.Optional(CONF_PATH, default="/"): ensure_valid_path,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_NUMBER_OF_TOOLS, default=0): cv.positive_int,
vol.Optional(CONF_BED, default=False): cv.boolean,
vol.Optional(CONF_SENSORS, default={}): SENSOR_SCHEMA,
vol.Optional(
CONF_BINARY_SENSORS, default={}
): BINARY_SENSOR_SCHEMA,
}
)
],
has_all_unique_names,
)
},
vol.All(
cv.deprecated(DOMAIN),
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_PORT, default=80): cv.port,
vol.Optional(CONF_PATH, default="/"): ensure_valid_path,
# Following values are not longer used in the configuration of the integration
# and are here for historical purposes
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(
CONF_NUMBER_OF_TOOLS, default=0
): cv.positive_int,
vol.Optional(CONF_BED, default=False): cv.boolean,
vol.Optional(CONF_SENSORS, default={}): SENSOR_SCHEMA,
vol.Optional(
CONF_BINARY_SENSORS, default={}
): BINARY_SENSOR_SCHEMA,
}
)
],
has_all_unique_names,
)
},
),
extra=vol.ALLOW_EXTRA,
)
def setup(hass, config):
async def async_setup(hass: HomeAssistant, config: dict):
"""Set up the OctoPrint component."""
printers = hass.data[DOMAIN] = {}
success = False
if DOMAIN not in config:
# Skip the setup if there is no configuration present
return True
for printer in config[DOMAIN]:
name = printer[CONF_NAME]
protocol = "https" if printer[CONF_SSL] else "http"
base_url = (
f"{protocol}://{printer[CONF_HOST]}:{printer[CONF_PORT]}"
f"{printer[CONF_PATH]}api/"
)
api_key = printer[CONF_API_KEY]
number_of_tools = printer[CONF_NUMBER_OF_TOOLS]
bed = printer[CONF_BED]
try:
octoprint_api = OctoPrintAPI(base_url, api_key, bed, number_of_tools)
printers[base_url] = octoprint_api
octoprint_api.get("printer")
octoprint_api.get("job")
except requests.exceptions.RequestException as conn_err:
_LOGGER.error("Error setting up OctoPrint API: %r", conn_err)
continue
domain_config = config[DOMAIN]
sensors = printer[CONF_SENSORS][CONF_MONITORED_CONDITIONS]
load_platform(
for conf in domain_config:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_API_KEY: conf[CONF_API_KEY],
CONF_HOST: conf[CONF_HOST],
CONF_PATH: conf[CONF_PATH],
CONF_PORT: conf[CONF_PORT],
CONF_SSL: conf[CONF_SSL],
},
)
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up OctoPrint from a config entry."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
websession = async_get_clientsession(hass)
client = OctoprintClient(
entry.data[CONF_HOST],
websession,
entry.data[CONF_PORT],
entry.data[CONF_SSL],
entry.data[CONF_PATH],
)
client.set_api_key(entry.data[CONF_API_KEY])
coordinator = OctoprintDataUpdateCoordinator(hass, client, entry.entry_id, 30)
await coordinator.async_config_entry_first_refresh()
hass.data[DOMAIN][entry.entry_id] = {"coordinator": coordinator, "client": client}
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
class OctoprintDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching Octoprint data."""
def __init__(
self,
hass: HomeAssistant,
octoprint: OctoprintClient,
config_entry_id: str,
interval: int,
) -> None:
"""Initialize."""
super().__init__(
hass,
"sensor",
DOMAIN,
{"name": name, "base_url": base_url, "sensors": sensors},
config,
_LOGGER,
name=f"octoprint-{config_entry_id}",
update_interval=timedelta(seconds=interval),
)
b_sensors = printer[CONF_BINARY_SENSORS][CONF_MONITORED_CONDITIONS]
load_platform(
hass,
"binary_sensor",
DOMAIN,
{"name": name, "base_url": base_url, "sensors": b_sensors},
config,
)
success = True
self._octoprint = octoprint
self._printer_offline = False
self.data = {"printer": None, "job": None, "last_read_time": None}
return success
class OctoPrintAPI:
"""Simple JSON wrapper for OctoPrint's API."""
def __init__(self, api_url, key, bed, number_of_tools):
"""Initialize OctoPrint API and set headers needed later."""
self.api_url = api_url
self.headers = {CONTENT_TYPE: CONTENT_TYPE_JSON, "X-Api-Key": key}
self.printer_last_reading = [{}, None]
self.job_last_reading = [{}, None]
self.job_available = False
self.printer_available = False
self.printer_error_logged = False
self.available = False
self.available_error_logged = False
self.job_error_logged = False
self.bed = bed
self.number_of_tools = number_of_tools
def get_tools(self):
"""Get the list of tools that temperature is monitored on."""
tools = []
if self.number_of_tools > 0:
for tool_number in range(0, self.number_of_tools):
tools.append(f"tool{tool_number!s}")
if self.bed:
tools.append("bed")
if not self.bed and self.number_of_tools == 0:
temps = self.printer_last_reading[0].get("temperature")
if temps is not None:
tools = temps.keys()
return tools
def get(self, endpoint):
"""Send a get request, and return the response as a dict."""
# Only query the API at most every 30 seconds
now = time.time()
if endpoint == "job":
last_time = self.job_last_reading[1]
if last_time is not None and now - last_time < 30.0:
return self.job_last_reading[0]
elif endpoint == "printer":
last_time = self.printer_last_reading[1]
if last_time is not None and now - last_time < 30.0:
return self.printer_last_reading[0]
url = self.api_url + endpoint
async def _async_update_data(self):
"""Update data via API."""
printer = None
try:
response = requests.get(url, headers=self.headers, timeout=9)
response.raise_for_status()
if endpoint == "job":
self.job_last_reading[0] = response.json()
self.job_last_reading[1] = time.time()
self.job_available = True
elif endpoint == "printer":
self.printer_last_reading[0] = response.json()
self.printer_last_reading[1] = time.time()
self.printer_available = True
job = await self._octoprint.get_job_info()
except ApiError as err:
raise UpdateFailed(err) from err
self.available = self.printer_available and self.job_available
if self.available:
self.job_error_logged = False
self.printer_error_logged = False
self.available_error_logged = False
# If octoprint is on, but the printer is disconnected
# printer will return a 409, so continue using the last
# reading if there is one
try:
printer = await self._octoprint.get_printer_info()
except PrinterOffline:
if not self._printer_offline:
_LOGGER.error("Unable to retrieve printer information: Printer offline")
self._printer_offline = True
except ApiError as err:
raise UpdateFailed(err) from err
else:
self._printer_offline = False
return response.json()
except requests.ConnectionError as exc_con:
log_string = f"Failed to connect to Octoprint server. Error: {exc_con}"
if not self.available_error_logged:
_LOGGER.error(log_string)
self.job_available = False
self.printer_available = False
self.available_error_logged = True
return None
except requests.HTTPError as ex_http:
status_code = ex_http.response.status_code
log_string = f"Failed to update OctoPrint status. Error: {ex_http}"
# Only log the first failure
if endpoint == "job":
log_string = f"Endpoint: job {log_string}"
if not self.job_error_logged:
_LOGGER.error(log_string)
self.job_error_logged = True
self.job_available = False
elif endpoint == "printer":
if (
status_code == 409
): # octoprint returns HTTP 409 when printer is not connected (and many other states)
self.printer_available = False
else:
log_string = f"Endpoint: printer {log_string}"
if not self.printer_error_logged:
_LOGGER.error(log_string)
self.printer_error_logged = True
self.printer_available = False
self.available = False
return None
def update(self, sensor_type, end_point, group, tool=None):
"""Return the value for sensor_type from the provided endpoint."""
if (response := self.get(end_point)) is not None:
return get_value_from_json(response, sensor_type, group, tool)
return response
def get_value_from_json(json_dict, sensor_type, group, tool):
"""Return the value for sensor_type from the JSON."""
if group not in json_dict:
return None
if sensor_type in json_dict[group]:
if sensor_type == "target" and json_dict[sensor_type] is None:
return 0
return json_dict[group][sensor_type]
if tool is not None and sensor_type in json_dict[group][tool]:
return json_dict[group][tool][sensor_type]
return None
return {"job": job, "printer": printer, "last_read_time": dt_util.utcnow()}

View File

@ -1,61 +1,74 @@
"""Support for monitoring OctoPrint binary sensors."""
from __future__ import annotations
from abc import abstractmethod
import logging
import requests
from pyoctoprintapi import OctoprintPrinterInfo
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import BINARY_SENSOR_TYPES, DOMAIN as COMPONENT_DOMAIN
from .const import DOMAIN as COMPONENT_DOMAIN
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the available OctoPrint binary sensors."""
if discovery_info is None:
return
coordinator: DataUpdateCoordinator = hass.data[COMPONENT_DOMAIN][
config_entry.entry_id
]["coordinator"]
device_id = config_entry.unique_id
name = discovery_info["name"]
base_url = discovery_info["base_url"]
monitored_conditions = discovery_info["sensors"]
octoprint_api = hass.data[COMPONENT_DOMAIN][base_url]
assert device_id is not None
devices = []
for octo_type in monitored_conditions:
new_sensor = OctoPrintBinarySensor(
octoprint_api,
octo_type,
BINARY_SENSOR_TYPES[octo_type][2],
name,
BINARY_SENSOR_TYPES[octo_type][3],
BINARY_SENSOR_TYPES[octo_type][0],
BINARY_SENSOR_TYPES[octo_type][1],
"flags",
)
devices.append(new_sensor)
add_entities(devices, True)
entities: list[BinarySensorEntity] = [
OctoPrintPrintingBinarySensor(coordinator, device_id),
OctoPrintPrintingErrorBinarySensor(coordinator, device_id),
]
async_add_entities(entities)
class OctoPrintBinarySensor(BinarySensorEntity):
class OctoPrintBinarySensorBase(CoordinatorEntity, BinarySensorEntity):
"""Representation an OctoPrint binary sensor."""
def __init__(
self, api, condition, sensor_type, sensor_name, unit, endpoint, group, tool=None
):
self,
coordinator: DataUpdateCoordinator,
sensor_type: str,
device_id: str,
) -> None:
"""Initialize a new OctoPrint sensor."""
self.sensor_name = sensor_name
if tool is None:
self._name = f"{sensor_name} {condition}"
else:
self._name = f"{sensor_name} {condition}"
super().__init__(coordinator)
self._name = f"Octoprint {sensor_type}"
self.sensor_type = sensor_type
self.api = api
self._state = False
self._unit_of_measurement = unit
self.api_endpoint = endpoint
self.api_group = group
self.api_tool = tool
_LOGGER.debug("Created OctoPrint binary sensor %r", self)
self._device_id = device_id
@property
def device_info(self):
"""Device info."""
return {
"identifiers": {(COMPONENT_DOMAIN, self._device_id)},
"manufacturer": "Octoprint",
"name": "Octoprint",
}
@property
def unique_id(self):
"""Return a unique id."""
return f"{self.sensor_type}-{self._device_id}"
@property
def name(self):
@ -65,19 +78,39 @@ class OctoPrintBinarySensor(BinarySensorEntity):
@property
def is_on(self):
"""Return true if binary sensor is on."""
return bool(self._state)
printer = self.coordinator.data["printer"]
if not printer:
return None
return bool(self._get_flag_state(printer))
@property
def device_class(self):
"""Return the class of this sensor, from DEVICE_CLASSES."""
return None
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data["printer"]
def update(self):
"""Update state of sensor."""
try:
self._state = self.api.update(
self.sensor_type, self.api_endpoint, self.api_group, self.api_tool
)
except requests.exceptions.ConnectionError:
# Error calling the api, already logged in api.update()
return
@abstractmethod
def _get_flag_state(self, printer_info: OctoprintPrinterInfo) -> bool | None:
"""Return the value of the sensor flag."""
class OctoPrintPrintingBinarySensor(OctoPrintBinarySensorBase):
"""Representation an OctoPrint binary sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Printing", device_id)
def _get_flag_state(self, printer_info: OctoprintPrinterInfo) -> bool | None:
return bool(printer_info.state.flags.printing)
class OctoPrintPrintingErrorBinarySensor(OctoPrintBinarySensorBase):
"""Representation an OctoPrint binary sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Printing Error", device_id)
def _get_flag_state(self, printer_info: OctoprintPrinterInfo) -> bool | None:
return bool(printer_info.state.flags.error)

View File

@ -0,0 +1,202 @@
"""Config flow for OctoPrint integration."""
import logging
from urllib.parse import urlsplit
from pyoctoprintapi import ApiError, OctoprintClient, OctoprintException
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow, exceptions
from homeassistant.const import (
CONF_API_KEY,
CONF_HOST,
CONF_PATH,
CONF_PORT,
CONF_SSL,
CONF_USERNAME,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
def _schema_with_defaults(username="", host=None, port=80, path="/", ssl=False):
return vol.Schema(
{
vol.Required(CONF_USERNAME, default=username): cv.string,
vol.Required(CONF_HOST, default=host): cv.string,
vol.Optional(CONF_PORT, default=port): cv.port,
vol.Optional(CONF_PATH, default=path): cv.string,
vol.Optional(CONF_SSL, default=ssl): cv.boolean,
},
extra=vol.ALLOW_EXTRA,
)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for OctoPrint."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
api_key_task = None
def __init__(self) -> None:
"""Handle a config flow for OctoPrint."""
self.discovery_schema = None
self._user_input = None
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
# When coming back from the progress steps, the user_input is stored in the
# instance variable instead of being passed in
if user_input is None and self._user_input:
user_input = self._user_input
if user_input is None:
data = self.discovery_schema or _schema_with_defaults()
return self.async_show_form(step_id="user", data_schema=data)
if CONF_API_KEY in user_input:
errors = {}
try:
return await self._finish_config(user_input)
except data_entry_flow.AbortFlow as err:
raise err from None
except CannotConnect:
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
errors["base"] = "unknown"
if errors:
return self.async_show_form(
step_id="user",
errors=errors,
data_schema=_schema_with_defaults(
user_input.get(CONF_USERNAME),
user_input[CONF_HOST],
user_input[CONF_PORT],
user_input[CONF_PATH],
user_input[CONF_SSL],
),
)
self.api_key_task = None
return await self.async_step_get_api_key(user_input)
async def async_step_get_api_key(self, user_input):
"""Get an Application Api Key."""
if not self.api_key_task:
self.api_key_task = self.hass.async_create_task(
self._async_get_auth_key(user_input)
)
return self.async_show_progress(
step_id="get_api_key", progress_action="get_api_key"
)
try:
await self.api_key_task
except OctoprintException as err:
_LOGGER.exception("Failed to get an application key: %s", err)
return self.async_show_progress_done(next_step_id="auth_failed")
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Failed to get an application key : %s", err)
return self.async_show_progress_done(next_step_id="auth_failed")
# store this off here to pick back up in the user step
self._user_input = user_input
return self.async_show_progress_done(next_step_id="user")
async def _finish_config(self, user_input):
"""Finish the configuration setup."""
session = async_get_clientsession(self.hass)
octoprint = OctoprintClient(
user_input[CONF_HOST],
session,
user_input[CONF_PORT],
user_input[CONF_SSL],
user_input[CONF_PATH],
)
octoprint.set_api_key(user_input[CONF_API_KEY])
try:
discovery = await octoprint.get_discovery_info()
except ApiError as err:
_LOGGER.error("Failed to connect to printer")
raise CannotConnect from err
await self.async_set_unique_id(discovery.upnp_uuid, raise_on_progress=False)
self._abort_if_unique_id_configured()
return self.async_create_entry(title=user_input[CONF_HOST], data=user_input)
async def async_step_auth_failed(self, user_input):
"""Handle api fetch failure."""
return self.async_abort(reason="auth_failed")
async def async_step_import(self, user_input):
"""Handle import."""
return await self.async_step_user(user_input)
async def async_step_zeroconf(self, discovery_info):
"""Handle discovery flow."""
uuid = discovery_info["properties"]["uuid"]
await self.async_set_unique_id(uuid)
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {
CONF_HOST: discovery_info[CONF_HOST],
}
self.discovery_schema = _schema_with_defaults(
host=discovery_info[CONF_HOST],
port=discovery_info[CONF_PORT],
path=discovery_info["properties"][CONF_PATH],
)
return await self.async_step_user()
async def async_step_ssdp(self, discovery_info):
"""Handle ssdp discovery flow."""
uuid = discovery_info["UDN"][5:]
await self.async_set_unique_id(uuid)
self._abort_if_unique_id_configured()
url = urlsplit(discovery_info["presentationURL"])
self.context["title_placeholders"] = {
CONF_HOST: url.hostname,
}
self.discovery_schema = _schema_with_defaults(
host=url.hostname,
port=url.port,
)
return await self.async_step_user()
async def _async_get_auth_key(self, user_input: dict):
"""Get application api key."""
session = async_get_clientsession(self.hass)
octoprint = OctoprintClient(
user_input[CONF_HOST],
session,
user_input[CONF_PORT],
user_input[CONF_SSL],
user_input[CONF_PATH],
)
try:
user_input[CONF_API_KEY] = await octoprint.request_app_key(
"Home Assistant", user_input[CONF_USERNAME], 30
)
finally:
# Continue the flow after show progress when the task is done.
self.hass.async_create_task(
self.hass.config_entries.flow.async_configure(
flow_id=self.flow_id, user_input=user_input
)
)
class CannotConnect(exceptions.HomeAssistantError):
"""Error to indicate we cannot connect."""

View File

@ -0,0 +1,5 @@
"""Constants for the OctoPrint integration."""
DOMAIN = "octoprint"
DEFAULT_NAME = "OctoPrint"

View File

@ -1,8 +1,16 @@
{
"domain": "octoprint",
"name": "OctoPrint",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/octoprint",
"after_dependencies": ["discovery"],
"codeowners": [],
"requirements": ["pyoctoprintapi==0.1.6"],
"codeowners": ["@rfleming71"],
"zeroconf": ["_octoprint._tcp.local."],
"ssdp": [
{
"manufacturer": "The OctoPrint Project",
"deviceType": "urn:schemas-upnp-org:device:Basic:1"
}
],
"iot_class": "local_polling"
}

View File

@ -1,143 +1,256 @@
"""Support for monitoring OctoPrint sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
import requests
from pyoctoprintapi import OctoprintJobInfo, OctoprintPrinterInfo
from homeassistant.components.sensor import STATE_CLASS_MEASUREMENT, SensorEntity
from homeassistant.const import DEVICE_CLASS_TEMPERATURE, PERCENTAGE, TEMP_CELSIUS
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
DEVICE_CLASS_TEMPERATURE,
DEVICE_CLASS_TIMESTAMP,
PERCENTAGE,
TEMP_CELSIUS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from . import DOMAIN as COMPONENT_DOMAIN, SENSOR_TYPES
from . import DOMAIN as COMPONENT_DOMAIN
_LOGGER = logging.getLogger(__name__)
NOTIFICATION_ID = "octoprint_notification"
NOTIFICATION_TITLE = "OctoPrint sensor setup error"
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the available OctoPrint binary sensors."""
coordinator: DataUpdateCoordinator = hass.data[COMPONENT_DOMAIN][
config_entry.entry_id
]["coordinator"]
device_id = config_entry.unique_id
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the available OctoPrint sensors."""
if discovery_info is None:
return
assert device_id is not None
name = discovery_info["name"]
base_url = discovery_info["base_url"]
monitored_conditions = discovery_info["sensors"]
octoprint_api = hass.data[COMPONENT_DOMAIN][base_url]
tools = octoprint_api.get_tools()
if "Temperatures" in monitored_conditions and not tools:
hass.components.persistent_notification.create(
"Your printer appears to be offline.<br />"
"If you do not want to have your printer on <br />"
" at all times, and you would like to monitor <br /> "
"temperatures, please add <br />"
"bed and/or number&#95;of&#95;tools to your configuration <br />"
"and restart.",
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID,
)
devices = []
types = ["actual", "target"]
for octo_type in monitored_conditions:
if octo_type == "Temperatures":
for tool in tools:
for temp_type in types:
new_sensor = OctoPrintSensor(
api=octoprint_api,
condition=temp_type,
sensor_type=temp_type,
sensor_name=name,
unit=SENSOR_TYPES[octo_type][3],
endpoint=SENSOR_TYPES[octo_type][0],
group=SENSOR_TYPES[octo_type][1],
tool=tool,
device_class=DEVICE_CLASS_TEMPERATURE,
state_class=STATE_CLASS_MEASUREMENT,
entities: list[SensorEntity] = []
if coordinator.data["printer"]:
printer_info = coordinator.data["printer"]
types = ["actual", "target"]
for tool in printer_info.temperatures:
for temp_type in types:
entities.append(
OctoPrintTemperatureSensor(
coordinator,
tool.name,
temp_type,
device_id,
)
devices.append(new_sensor)
else:
new_sensor = OctoPrintSensor(
api=octoprint_api,
condition=octo_type,
sensor_type=SENSOR_TYPES[octo_type][2],
sensor_name=name,
unit=SENSOR_TYPES[octo_type][3],
endpoint=SENSOR_TYPES[octo_type][0],
group=SENSOR_TYPES[octo_type][1],
icon=SENSOR_TYPES[octo_type][4],
)
devices.append(new_sensor)
add_entities(devices, True)
)
else:
_LOGGER.error("Printer appears to be offline, skipping temperature sensors")
entities.append(OctoPrintStatusSensor(coordinator, device_id))
entities.append(OctoPrintJobPercentageSensor(coordinator, device_id))
entities.append(OctoPrintEstimatedFinishTimeSensor(coordinator, device_id))
entities.append(OctoPrintStartTimeSensor(coordinator, device_id))
async_add_entities(entities)
class OctoPrintSensor(SensorEntity):
class OctoPrintSensorBase(CoordinatorEntity, SensorEntity):
"""Representation of an OctoPrint sensor."""
def __init__(
self,
api,
condition,
sensor_type,
sensor_name,
unit,
endpoint,
group,
tool=None,
icon=None,
device_class=None,
state_class=None,
):
coordinator: DataUpdateCoordinator,
sensor_type: str,
device_id: str,
) -> None:
"""Initialize a new OctoPrint sensor."""
self.sensor_name = sensor_name
if tool is None:
self._name = f"{sensor_name} {condition}"
else:
self._name = f"{sensor_name} {condition} {tool} temp"
self.sensor_type = sensor_type
self.api = api
self._state = None
self._unit_of_measurement = unit
self.api_endpoint = endpoint
self.api_group = group
self.api_tool = tool
self._icon = icon
self._attr_device_class = device_class
self._attr_state_class = state_class
_LOGGER.debug("Created OctoPrint sensor %r", self)
super().__init__(coordinator)
self._sensor_type = sensor_type
self._name = f"Octoprint {sensor_type}"
self._device_id = device_id
@property
def device_info(self):
"""Device info."""
return {
"identifiers": {(COMPONENT_DOMAIN, self._device_id)},
"manufacturer": "Octoprint",
"name": "Octoprint",
}
@property
def unique_id(self):
"""Return a unique id."""
return f"{self._sensor_type}-{self._device_id}"
@property
def name(self):
"""Return the name of the sensor."""
return self._name
class OctoPrintStatusSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Current State", device_id)
@property
def native_value(self):
"""Return the state of the sensor."""
sensor_unit = self.unit_of_measurement
if sensor_unit in (TEMP_CELSIUS, PERCENTAGE):
# API sometimes returns null and not 0
if self._state is None:
self._state = 0
return round(self._state, 2)
return self._state
"""Return sensor state."""
printer: OctoprintPrinterInfo = self.coordinator.data["printer"]
if not printer:
return None
@property
def native_unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
def update(self):
"""Update state of sensor."""
try:
self._state = self.api.update(
self.sensor_type, self.api_endpoint, self.api_group, self.api_tool
)
except requests.exceptions.ConnectionError:
# Error calling the api, already logged in api.update()
return
return printer.state.text
@property
def icon(self):
"""Icon to use in the frontend."""
return self._icon
return "mdi:printer-3d"
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data["printer"]
class OctoPrintJobPercentageSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Job Percentage", device_id)
@property
def native_value(self):
"""Return sensor state."""
job: OctoprintJobInfo = self.coordinator.data["job"]
if not job:
return None
state = job.progress.completion
if not state:
return 0
return round(state, 2)
@property
def native_unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return PERCENTAGE
@property
def icon(self):
"""Icon to use in the frontend."""
return "mdi:file-percent"
class OctoPrintEstimatedFinishTimeSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Estimated Finish Time", device_id)
@property
def native_value(self):
"""Return sensor state."""
job: OctoprintJobInfo = self.coordinator.data["job"]
if not job or not job.progress.print_time_left or job.state != "Printing":
return None
read_time = self.coordinator.data["last_read_time"]
return (read_time + timedelta(seconds=job.progress.print_time_left)).isoformat()
@property
def device_class(self):
"""Return the device class of the sensor."""
return DEVICE_CLASS_TIMESTAMP
class OctoPrintStartTimeSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(self, coordinator: DataUpdateCoordinator, device_id: str) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Start Time", device_id)
@property
def native_value(self):
"""Return sensor state."""
job: OctoprintJobInfo = self.coordinator.data["job"]
if not job or not job.progress.print_time or job.state != "Printing":
return None
read_time = self.coordinator.data["last_read_time"]
return (read_time - timedelta(seconds=job.progress.print_time)).isoformat()
@property
def device_class(self):
"""Return the device class of the sensor."""
return DEVICE_CLASS_TIMESTAMP
class OctoPrintTemperatureSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(
self,
coordinator: DataUpdateCoordinator,
tool: str,
temp_type: str,
device_id: str,
) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, f"{temp_type} {tool} temp", device_id)
self._temp_type = temp_type
self._api_tool = tool
self._attr_state_class = STATE_CLASS_MEASUREMENT
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return TEMP_CELSIUS
@property
def device_class(self):
"""Return the device class of this entity."""
return DEVICE_CLASS_TEMPERATURE
@property
def native_value(self):
"""Return sensor state."""
printer: OctoprintPrinterInfo = self.coordinator.data["printer"]
if not printer:
return None
for temp in printer.temperatures:
if temp.name == self._api_tool:
return round(
temp.actual_temp
if self._temp_type == "actual"
else temp.target_temp,
2,
)
return None
@property
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data["printer"]

View File

@ -0,0 +1,29 @@
{
"config": {
"flow_title": "OctoPrint Printer: {host}",
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"path": "Application Path",
"port": "Port Number",
"ssl": "Use SSL",
"username": "[%key:common::config_flow::data::username%]"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"unknown": "[%key:common::config_flow::error::unknown%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"auth_failed": "Failed to retrieve application api key"
},
"progress": {
"get_api_key": "Open the OctoPrint UI and click 'Allow' on the Access Request for 'Home Assistant'."
}
}
}

View File

@ -0,0 +1,29 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured",
"auth_failed": "Failed to retrieve application api key",
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"
},
"error": {
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"
},
"flow_title": "OctoPrint Printer: {host}",
"progress": {
"get_api_key": "Open the OctoPrint UI and click 'Allow' on the Access Request for 'Home Assistant'."
},
"step": {
"user": {
"data": {
"host": "Host",
"path": "Application Path",
"port": "Port Number",
"ssl": "Use SSL",
"username": "Username"
}
}
}
}
}

View File

@ -201,6 +201,7 @@ FLOWS = [
"nut",
"nws",
"nzbget",
"octoprint",
"omnilogic",
"ondilo_ico",
"onewire",

View File

@ -171,6 +171,12 @@ SSDP = {
"manufacturer": "NETGEAR, Inc."
}
],
"octoprint": [
{
"deviceType": "urn:schemas-upnp-org:device:Basic:1",
"manufacturer": "The OctoPrint Project"
}
],
"roku": [
{
"deviceType": "urn:roku-com:device:player:1-0",

View File

@ -194,6 +194,11 @@ ZEROCONF = {
"domain": "nut"
}
],
"_octoprint._tcp.local.": [
{
"domain": "octoprint"
}
],
"_plexmediasvr._tcp.local.": [
{
"domain": "plex"

View File

@ -1678,6 +1678,9 @@ pynzbgetapi==0.2.0
# homeassistant.components.obihai
pyobihai==1.3.1
# homeassistant.components.octoprint
pyoctoprintapi==0.1.6
# homeassistant.components.ombi
pyombi==0.1.10

View File

@ -1006,6 +1006,9 @@ pynx584==0.5
# homeassistant.components.nzbget
pynzbgetapi==0.2.0
# homeassistant.components.octoprint
pyoctoprintapi==0.1.6
# homeassistant.components.openuv
pyopenuv==2.2.1

View File

@ -0,0 +1,82 @@
"""Tests for the OctoPrint integration."""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
from pyoctoprintapi import (
DiscoverySettings,
OctoprintJobInfo,
OctoprintPrinterInfo,
TrackingSetting,
)
from homeassistant.components.octoprint import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
from tests.common import MockConfigEntry
DEFAULT_JOB = {
"job": {},
"progress": {"completion": 50},
}
DEFAULT_PRINTER = {
"state": {
"flags": {"printing": True, "error": False},
"text": "Operational",
},
"temperature": [],
}
async def init_integration(
hass,
platform,
printer: dict[str, Any] | UndefinedType | None = UNDEFINED,
job: dict[str, Any] | None = None,
):
"""Set up the octoprint integration in Home Assistant."""
printer_info: OctoprintPrinterInfo | None = None
if printer is UNDEFINED:
printer = DEFAULT_PRINTER
if printer is not None:
printer_info = OctoprintPrinterInfo(printer)
if job is None:
job = DEFAULT_JOB
with patch("homeassistant.components.octoprint.PLATFORMS", [platform]), patch(
"pyoctoprintapi.OctoprintClient.get_server_info", return_value={}
), patch(
"pyoctoprintapi.OctoprintClient.get_printer_info",
return_value=printer_info,
), patch(
"pyoctoprintapi.OctoprintClient.get_job_info",
return_value=OctoprintJobInfo(job),
), patch(
"pyoctoprintapi.OctoprintClient.get_tracking_info",
return_value=TrackingSetting({"unique_id": "uuid"}),
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
):
config_entry = MockConfigEntry(
domain=DOMAIN,
entry_id="uuid",
unique_id="uuid",
data={
"host": "1.1.1.1",
"api_key": "test-key",
"name": "Octoprint",
"port": 81,
"ssl": True,
"path": "/",
},
title="Octoprint",
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state == ConfigEntryState.LOADED

View File

@ -0,0 +1,54 @@
"""The tests for Octoptint binary sensor module."""
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from . import init_integration
async def test_sensors(hass):
"""Test the underlying sensors."""
printer = {
"state": {
"flags": {"printing": True, "error": False},
"text": "Operational",
},
"temperature": [],
}
await init_integration(hass, "binary_sensor", printer=printer)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
state = hass.states.get("binary_sensor.octoprint_printing")
assert state is not None
assert state.state == STATE_ON
assert state.name == "Octoprint Printing"
entry = entity_registry.async_get("binary_sensor.octoprint_printing")
assert entry.unique_id == "Printing-uuid"
state = hass.states.get("binary_sensor.octoprint_printing_error")
assert state is not None
assert state.state == STATE_OFF
assert state.name == "Octoprint Printing Error"
entry = entity_registry.async_get("binary_sensor.octoprint_printing_error")
assert entry.unique_id == "Printing Error-uuid"
async def test_sensors_printer_offline(hass):
"""Test the underlying sensors when the printer is offline."""
await init_integration(hass, "binary_sensor", printer=None)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
state = hass.states.get("binary_sensor.octoprint_printing")
assert state is not None
assert state.state == STATE_UNAVAILABLE
assert state.name == "Octoprint Printing"
entry = entity_registry.async_get("binary_sensor.octoprint_printing")
assert entry.unique_id == "Printing-uuid"
state = hass.states.get("binary_sensor.octoprint_printing_error")
assert state is not None
assert state.state == STATE_UNAVAILABLE
assert state.name == "Octoprint Printing Error"
entry = entity_registry.async_get("binary_sensor.octoprint_printing_error")
assert entry.unique_id == "Printing Error-uuid"

View File

@ -0,0 +1,519 @@
"""Test the OctoPrint config flow."""
from unittest.mock import patch
from pyoctoprintapi import ApiError, DiscoverySettings
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.octoprint.const import DOMAIN
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def test_form(hass):
"""Test we get the form."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert not result["errors"]
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
await hass.async_block_till_done()
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.get_server_info",
return_value=True,
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"homeassistant.components.octoprint.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.octoprint.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
await hass.async_block_till_done()
assert result2["type"] == "create_entry"
assert result2["title"] == "1.1.1.1"
assert result2["data"] == {
"username": "testuser",
"host": "1.1.1.1",
"api_key": "test-key",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
}
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_cannot_connect(hass):
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
assert result["type"] == "progress_done"
with patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
side_effect=ApiError,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
"api_key": "test-key",
},
)
assert result["type"] == "form"
assert result["errors"]["base"] == "cannot_connect"
async def test_form_unknown_exception(hass):
"""Test we handle a random error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
assert result["type"] == "progress_done"
with patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
side_effect=Exception,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
"api_key": "test-key",
},
)
assert result["type"] == "form"
assert result["errors"]["base"] == "unknown"
async def test_show_zerconf_form(hass: HomeAssistant) -> None:
"""Test that the zeroconf confirmation form is served."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_ZEROCONF},
data={
"host": "192.168.1.123",
"port": 80,
"hostname": "example.local.",
"uuid": "83747482",
"properties": {"uuid": "83747482", "path": "/foo/"},
},
)
assert result["type"] == "form"
assert not result["errors"]
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
},
)
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
await hass.async_block_till_done()
assert result["type"] == "progress_done"
with patch(
"pyoctoprintapi.OctoprintClient.get_server_info",
return_value=True,
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"homeassistant.components.octoprint.async_setup", return_value=True
), patch(
"homeassistant.components.octoprint.async_setup_entry",
return_value=True,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
"api_key": "test-key",
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
async def test_show_ssdp_form(hass: HomeAssistant) -> None:
"""Test that the zeroconf confirmation form is served."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
"presentationURL": "http://192.168.1.123:80/discovery/device.xml",
"port": 80,
"UDN": "uuid:83747482",
},
)
assert result["type"] == "form"
assert not result["errors"]
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
},
)
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
await hass.async_block_till_done()
assert result["type"] == "progress_done"
with patch(
"pyoctoprintapi.OctoprintClient.get_server_info",
return_value=True,
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"homeassistant.components.octoprint.async_setup", return_value=True
), patch(
"homeassistant.components.octoprint.async_setup_entry",
return_value=True,
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
"api_key": "test-key",
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
async def test_import_yaml(hass: HomeAssistant) -> None:
"""Test that the yaml import works."""
with patch(
"pyoctoprintapi.OctoprintClient.get_server_info",
return_value=True,
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
), patch(
"homeassistant.components.octoprint.async_setup", return_value=True
), patch(
"homeassistant.components.octoprint.async_setup_entry",
return_value=True,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
"host": "1.1.1.1",
"api_key": "test-key",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert "errors" not in result
async def test_import_duplicate_yaml(hass: HomeAssistant) -> None:
"""Test that the yaml import works."""
MockConfigEntry(
domain=DOMAIN,
data={"host": "192.168.1.123"},
source=config_entries.SOURCE_IMPORT,
unique_id="uuid",
).add_to_hass(hass)
with patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
) as request_app_key:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
"host": "1.1.1.1",
"api_key": "test-key",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
await hass.async_block_till_done()
assert len(request_app_key.mock_calls) == 0
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_failed_auth(hass: HomeAssistant) -> None:
"""Test we handle a random error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
assert result["type"] == "progress"
with patch("pyoctoprintapi.OctoprintClient.request_app_key", side_effect=ApiError):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
assert result["type"] == "progress_done"
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == "abort"
assert result["reason"] == "auth_failed"
async def test_failed_auth_unexpected_error(hass: HomeAssistant) -> None:
"""Test we handle a random error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
assert result["type"] == "progress"
with patch("pyoctoprintapi.OctoprintClient.request_app_key", side_effect=Exception):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
assert result["type"] == "progress_done"
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == "abort"
assert result["reason"] == "auth_failed"
async def test_user_duplicate_entry(hass):
"""Test that duplicate entries abort."""
MockConfigEntry(
domain=DOMAIN,
data={"host": "192.168.1.123"},
source=config_entries.SOURCE_IMPORT,
unique_id="uuid",
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert not result["errors"]
with patch(
"pyoctoprintapi.OctoprintClient.request_app_key", return_value="test-key"
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": "testuser",
"host": "1.1.1.1",
"name": "Printer",
"port": 81,
"ssl": True,
"path": "/",
},
)
await hass.async_block_till_done()
assert result["type"] == "progress"
with patch(
"pyoctoprintapi.OctoprintClient.get_server_info",
return_value=True,
), patch(
"pyoctoprintapi.OctoprintClient.get_discovery_info",
return_value=DiscoverySettings({"upnpUuid": "uuid"}),
), patch(
"homeassistant.components.octoprint.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.octoprint.async_setup_entry",
return_value=True,
) as mock_setup_entry:
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
)
await hass.async_block_till_done()
assert result2["type"] == "abort"
assert result2["reason"] == "already_configured"
assert len(mock_setup.mock_calls) == 0
assert len(mock_setup_entry.mock_calls) == 0
async def test_duplicate_zerconf_ignored(hass: HomeAssistant) -> None:
"""Test that the duplicate zeroconf isn't shown."""
MockConfigEntry(
domain=DOMAIN,
data={"host": "192.168.1.123"},
source=config_entries.SOURCE_IMPORT,
unique_id="83747482",
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_ZEROCONF},
data={
"host": "192.168.1.123",
"port": 80,
"hostname": "example.local.",
"uuid": "83747482",
"properties": {"uuid": "83747482", "path": "/foo/"},
},
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_duplicate_ssdp_ignored(hass: HomeAssistant) -> None:
"""Test that duplicate ssdp form is note shown."""
MockConfigEntry(
domain=DOMAIN,
data={"host": "192.168.1.123"},
source=config_entries.SOURCE_IMPORT,
unique_id="83747482",
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data={
"presentationURL": "http://192.168.1.123:80/discovery/device.xml",
"port": 80,
"UDN": "uuid:83747482",
},
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"

View File

@ -0,0 +1,76 @@
"""The tests for Octoptint binary sensor module."""
from datetime import datetime
from unittest.mock import patch
from . import init_integration
async def test_sensors(hass):
"""Test the underlying sensors."""
printer = {
"state": {
"flags": {},
"text": "Operational",
},
"temperature": {"tool1": {"actual": 18.83136, "target": 37.83136}},
}
job = {
"job": {},
"progress": {"completion": 50, "printTime": 600, "printTimeLeft": 6000},
"state": "Printing",
}
with patch(
"homeassistant.util.dt.utcnow", return_value=datetime(2020, 2, 20, 9, 10, 0)
):
await init_integration(hass, "sensor", printer=printer, job=job)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
state = hass.states.get("sensor.octoprint_job_percentage")
assert state is not None
assert state.state == "50"
assert state.name == "Octoprint Job Percentage"
entry = entity_registry.async_get("sensor.octoprint_job_percentage")
assert entry.unique_id == "Job Percentage-uuid"
state = hass.states.get("sensor.octoprint_current_state")
assert state is not None
assert state.state == "Operational"
assert state.name == "Octoprint Current State"
entry = entity_registry.async_get("sensor.octoprint_current_state")
assert entry.unique_id == "Current State-uuid"
state = hass.states.get("sensor.octoprint_actual_tool1_temp")
assert state is not None
assert state.state == "18.83"
assert state.name == "Octoprint actual tool1 temp"
entry = entity_registry.async_get("sensor.octoprint_actual_tool1_temp")
assert entry.unique_id == "actual tool1 temp-uuid"
state = hass.states.get("sensor.octoprint_target_tool1_temp")
assert state is not None
assert state.state == "37.83"
assert state.name == "Octoprint target tool1 temp"
entry = entity_registry.async_get("sensor.octoprint_target_tool1_temp")
assert entry.unique_id == "target tool1 temp-uuid"
state = hass.states.get("sensor.octoprint_target_tool1_temp")
assert state is not None
assert state.state == "37.83"
assert state.name == "Octoprint target tool1 temp"
entry = entity_registry.async_get("sensor.octoprint_target_tool1_temp")
assert entry.unique_id == "target tool1 temp-uuid"
state = hass.states.get("sensor.octoprint_start_time")
assert state is not None
assert state.state == "2020-02-20T09:00:00"
assert state.name == "Octoprint Start Time"
entry = entity_registry.async_get("sensor.octoprint_start_time")
assert entry.unique_id == "Start Time-uuid"
state = hass.states.get("sensor.octoprint_estimated_finish_time")
assert state is not None
assert state.state == "2020-02-20T10:50:00"
assert state.name == "Octoprint Estimated Finish Time"
entry = entity_registry.async_get("sensor.octoprint_estimated_finish_time")
assert entry.unique_id == "Estimated Finish Time-uuid"