Add new integration Qbus (#127280)

Co-authored-by: Abílio Costa <abmantis@users.noreply.github.com>
Co-authored-by: Thomas D <11554546+thomasddn@users.noreply.github.com>
This commit is contained in:
qbus-iot 2025-01-13 20:06:52 +01:00 committed by GitHub
parent ca34541b04
commit 2d2f4f5cec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1226 additions and 0 deletions

View File

@ -385,6 +385,7 @@ homeassistant.components.purpleair.*
homeassistant.components.pushbullet.*
homeassistant.components.pvoutput.*
homeassistant.components.python_script.*
homeassistant.components.qbus.*
homeassistant.components.qnap_qsw.*
homeassistant.components.rabbitair.*
homeassistant.components.radarr.*

2
CODEOWNERS generated
View File

@ -1191,6 +1191,8 @@ build.json @home-assistant/supervisor
/tests/components/pyload/ @tr4nt0r
/homeassistant/components/qbittorrent/ @geoffreylagaisse @finder39
/tests/components/qbittorrent/ @geoffreylagaisse @finder39
/homeassistant/components/qbus/ @Qbus-iot @thomasddn
/tests/components/qbus/ @Qbus-iot @thomasddn
/homeassistant/components/qingping/ @bdraco
/tests/components/qingping/ @bdraco
/homeassistant/components/qld_bushfire/ @exxamalte

View File

@ -0,0 +1,87 @@
"""The Qbus integration."""
import logging
from homeassistant.components.mqtt import async_wait_for_mqtt_client
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, PLATFORMS
from .coordinator import (
QBUS_KEY,
QbusConfigCoordinator,
QbusConfigEntry,
QbusControllerCoordinator,
)
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Qbus integration.
We set up a single coordinator for managing Qbus config updates. The
config update contains the configuration for all controllers (and
config entries). This avoids having each device requesting and managing
the config on its own.
"""
_LOGGER.debug("Loading integration")
if not await async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration not available")
return False
config_coordinator = QbusConfigCoordinator.get_or_create(hass)
await config_coordinator.async_subscribe_to_config()
return True
async def async_setup_entry(hass: HomeAssistant, entry: QbusConfigEntry) -> bool:
"""Set up Qbus from a config entry."""
_LOGGER.debug("%s - Loading entry", entry.unique_id)
if not await async_wait_for_mqtt_client(hass):
_LOGGER.error("MQTT integration not available")
raise ConfigEntryNotReady("MQTT integration not available")
coordinator = QbusControllerCoordinator(hass, entry)
entry.runtime_data = coordinator
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Get current config
config = await QbusConfigCoordinator.get_or_create(
hass
).async_get_or_request_config()
# Update the controller config
if config:
await coordinator.async_update_controller_config(config)
return True
async def async_unload_entry(hass: HomeAssistant, entry: QbusConfigEntry) -> bool:
"""Unload a config entry."""
_LOGGER.debug("%s - Unloading entry", entry.unique_id)
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
entry.runtime_data.shutdown()
cleanup(hass, entry)
return unload_ok
def cleanup(hass: HomeAssistant, entry: QbusConfigEntry) -> None:
"""Shutdown if no more entries are loaded."""
entries = hass.config_entries.async_loaded_entries(DOMAIN)
count = len(entries)
# During unloading of the entry, it is not marked as unloaded yet. So
# count can be 1 if it is the last one.
if count <= 1 and (config_coordinator := hass.data.get(QBUS_KEY)):
config_coordinator.shutdown()

View File

@ -0,0 +1,160 @@
"""Config flow for Qbus."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from qbusmqttapi.discovery import QbusMqttDevice
from qbusmqttapi.factory import QbusMqttMessageFactory, QbusMqttTopicFactory
from homeassistant.components.mqtt import client as mqtt
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_ID
from homeassistant.helpers.service_info.mqtt import MqttServiceInfo
from .const import CONF_SERIAL_NUMBER, DOMAIN
from .coordinator import QbusConfigCoordinator
_LOGGER = logging.getLogger(__name__)
class QbusFlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle Qbus config flow."""
VERSION = 1
def __init__(self) -> None:
"""Initialize."""
self._message_factory = QbusMqttMessageFactory()
self._topic_factory = QbusMqttTopicFactory()
self._gateway_topic = self._topic_factory.get_gateway_state_topic()
self._config_topic = self._topic_factory.get_config_topic()
self._device_topic = self._topic_factory.get_device_state_topic("+")
self._device: QbusMqttDevice | None = None
async def async_step_mqtt(
self, discovery_info: MqttServiceInfo
) -> ConfigFlowResult:
"""Handle a flow initialized by MQTT discovery."""
_LOGGER.debug("Running mqtt discovery for topic %s", discovery_info.topic)
# Abort if the payload is empty
if not discovery_info.payload:
_LOGGER.debug("Payload empty")
return self.async_abort(reason="invalid_discovery_info")
match discovery_info.subscribed_topic:
case self._gateway_topic:
return await self._async_handle_gateway_topic(discovery_info)
case self._config_topic:
return await self._async_handle_config_topic(discovery_info)
case self._device_topic:
return await self._async_handle_device_topic(discovery_info)
return self.async_abort(reason="invalid_discovery_info")
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the setup."""
if TYPE_CHECKING:
assert self._device is not None
if user_input is not None:
return self.async_create_entry(
title=f"Controller {self._device.serial_number}",
data={
CONF_SERIAL_NUMBER: self._device.serial_number,
CONF_ID: self._device.id,
},
)
return self.async_show_form(
step_id="discovery_confirm",
description_placeholders={
CONF_SERIAL_NUMBER: self._device.serial_number,
},
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
return self.async_abort(reason="not_supported")
async def _async_handle_gateway_topic(
self, discovery_info: MqttServiceInfo
) -> ConfigFlowResult:
_LOGGER.debug("Handling gateway state")
gateway_state = self._message_factory.parse_gateway_state(
discovery_info.payload
)
if gateway_state is not None and gateway_state.online is True:
_LOGGER.debug("Requesting config")
await mqtt.async_publish(
self.hass, self._topic_factory.get_get_config_topic(), b""
)
# Abort to wait for config topic
return self.async_abort(reason="discovery_in_progress")
async def _async_handle_config_topic(
self, discovery_info: MqttServiceInfo
) -> ConfigFlowResult:
_LOGGER.debug("Handling config topic")
qbus_config = self._message_factory.parse_discovery(discovery_info.payload)
if qbus_config is not None:
QbusConfigCoordinator.get_or_create(self.hass).store_config(qbus_config)
_LOGGER.debug("Requesting device states")
device_ids = [x.id for x in qbus_config.devices]
request = self._message_factory.create_state_request(device_ids)
await mqtt.async_publish(self.hass, request.topic, request.payload)
# Abort to wait for device topic
return self.async_abort(reason="discovery_in_progress")
async def _async_handle_device_topic(
self, discovery_info: MqttServiceInfo
) -> ConfigFlowResult:
_LOGGER.debug("Discovering device")
qbus_config = await QbusConfigCoordinator.get_or_create(
self.hass
).async_get_or_request_config()
if qbus_config is None:
_LOGGER.error("Qbus config not ready")
return self.async_abort(reason="invalid_discovery_info")
device_id = discovery_info.topic.split("/")[2]
self._device = qbus_config.get_device_by_id(device_id)
if self._device is None:
_LOGGER.warning("Device with id '%s' not found in config", device_id)
return self.async_abort(reason="invalid_discovery_info")
await self.async_set_unique_id(self._device.serial_number)
# Do not use error message "already_configured" (which is the
# default), as this will result in unsubscribing from the triggered
# mqtt topic. The topic subscribed to has a wildcard to allow
# discovery of multiple devices. Unsubscribing would result in
# not discovering new or unconfigured devices.
self._abort_if_unique_id_configured(error="device_already_configured")
self.context.update(
{
"title_placeholders": {
CONF_SERIAL_NUMBER: self._device.serial_number,
}
}
)
return await self.async_step_discovery_confirm()

View File

@ -0,0 +1,12 @@
"""Constants for the Qbus integration."""
from typing import Final
from homeassistant.const import Platform
DOMAIN: Final = "qbus"
PLATFORMS: list[Platform] = [Platform.SWITCH]
CONF_SERIAL_NUMBER: Final = "serial"
MANUFACTURER: Final = "Qbus"

View File

@ -0,0 +1,279 @@
"""Qbus coordinator."""
from __future__ import annotations
from datetime import datetime
import logging
from typing import cast
from qbusmqttapi.discovery import QbusDiscovery, QbusMqttDevice, QbusMqttOutput
from qbusmqttapi.factory import QbusMqttMessageFactory, QbusMqttTopicFactory
from homeassistant.components.mqtt import (
ReceiveMessage,
async_wait_for_mqtt_client,
client as mqtt,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util.hass_dict import HassKey
from .const import CONF_SERIAL_NUMBER, DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__)
type QbusConfigEntry = ConfigEntry[QbusControllerCoordinator]
QBUS_KEY: HassKey[QbusConfigCoordinator] = HassKey(DOMAIN)
class QbusControllerCoordinator(DataUpdateCoordinator[list[QbusMqttOutput]]):
"""Qbus data coordinator."""
_STATE_REQUEST_DELAY = 3
def __init__(self, hass: HomeAssistant, entry: QbusConfigEntry) -> None:
"""Initialize Qbus coordinator."""
_LOGGER.debug("%s - Initializing coordinator", entry.unique_id)
self.config_entry: QbusConfigEntry
super().__init__(
hass,
_LOGGER,
config_entry=entry,
name=entry.unique_id or entry.entry_id,
always_update=False,
)
self._message_factory = QbusMqttMessageFactory()
self._topic_factory = QbusMqttTopicFactory()
self._controller_activated = False
self._subscribed_to_controller_state = False
self._controller: QbusMqttDevice | None = None
# Clean up when HA stops
self.config_entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
)
async def _async_update_data(self) -> list[QbusMqttOutput]:
return self._controller.outputs if self._controller else []
def shutdown(self, event: Event | None = None) -> None:
"""Shutdown Qbus coordinator."""
_LOGGER.debug(
"%s - Shutting down entry coordinator", self.config_entry.unique_id
)
self._controller_activated = False
self._subscribed_to_controller_state = False
self._controller = None
async def async_update_controller_config(self, config: QbusDiscovery) -> None:
"""Update the controller based on the config."""
_LOGGER.debug("%s - Updating config", self.config_entry.unique_id)
serial = self.config_entry.data.get(CONF_SERIAL_NUMBER, "")
controller = config.get_device_by_serial(serial)
if controller is None:
_LOGGER.warning(
"%s - Controller with serial %s not found",
self.config_entry.unique_id,
serial,
)
return
self._controller = controller
self._update_device_info()
await self._async_subscribe_to_controller_state()
await self.async_refresh()
self._request_controller_state()
self._request_entity_states()
def _update_device_info(self) -> None:
if self._controller is None:
return
device_registry = dr.async_get(self.hass)
device_registry.async_get_or_create(
config_entry_id=self.config_entry.entry_id,
identifiers={(DOMAIN, format_mac(self._controller.mac))},
manufacturer=MANUFACTURER,
model="CTD3.x",
name=f"CTD {self._controller.serial_number}",
serial_number=self._controller.serial_number,
sw_version=self._controller.version,
)
async def _async_subscribe_to_controller_state(self) -> None:
if self._controller is None or self._subscribed_to_controller_state is True:
return
controller_state_topic = self._topic_factory.get_device_state_topic(
self._controller.id
)
_LOGGER.debug(
"%s - Subscribing to %s",
self.config_entry.unique_id,
controller_state_topic,
)
self._subscribed_to_controller_state = True
self.config_entry.async_on_unload(
await mqtt.async_subscribe(
self.hass,
controller_state_topic,
self._controller_state_received,
)
)
async def _controller_state_received(self, msg: ReceiveMessage) -> None:
_LOGGER.debug(
"%s - Receiving controller state %s", self.config_entry.unique_id, msg.topic
)
if self._controller is None or self._controller_activated:
return
state = self._message_factory.parse_device_state(msg.payload)
if state and state.properties and state.properties.connectable is False:
_LOGGER.debug(
"%s - Activating controller %s", self.config_entry.unique_id, state.id
)
self._controller_activated = True
request = self._message_factory.create_device_activate_request(
self._controller
)
await mqtt.async_publish(self.hass, request.topic, request.payload)
def _request_entity_states(self) -> None:
async def request_state(_: datetime) -> None:
if self._controller is None:
return
_LOGGER.debug(
"%s - Requesting %s entity states",
self.config_entry.unique_id,
len(self._controller.outputs),
)
request = self._message_factory.create_state_request(
[item.id for item in self._controller.outputs]
)
await mqtt.async_publish(self.hass, request.topic, request.payload)
if self._controller and len(self._controller.outputs) > 0:
async_call_later(self.hass, self._STATE_REQUEST_DELAY, request_state)
def _request_controller_state(self) -> None:
async def request_controller_state(_: datetime) -> None:
if self._controller is None:
return
_LOGGER.debug(
"%s - Requesting controller state", self.config_entry.unique_id
)
request = self._message_factory.create_device_state_request(
self._controller
)
await mqtt.async_publish(self.hass, request.topic, request.payload)
if self._controller:
async_call_later(
self.hass, self._STATE_REQUEST_DELAY, request_controller_state
)
class QbusConfigCoordinator:
"""Class responsible for Qbus config updates."""
_qbus_config: QbusDiscovery | None = None
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize config coordinator."""
self._hass = hass
self._message_factory = QbusMqttMessageFactory()
self._topic_factory = QbusMqttTopicFactory()
self._cleanup_callbacks: list[CALLBACK_TYPE] = []
self._cleanup_callbacks.append(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdown)
)
@classmethod
def get_or_create(cls, hass: HomeAssistant) -> QbusConfigCoordinator:
"""Get the coordinator and create if necessary."""
if (coordinator := hass.data.get(QBUS_KEY)) is None:
coordinator = cls(hass)
hass.data[QBUS_KEY] = coordinator
return coordinator
def shutdown(self, event: Event | None = None) -> None:
"""Shutdown Qbus config coordinator."""
_LOGGER.debug("Shutting down Qbus config coordinator")
while self._cleanup_callbacks:
cleanup_callback = self._cleanup_callbacks.pop()
cleanup_callback()
async def async_subscribe_to_config(self) -> None:
"""Subscribe to config changes."""
config_topic = self._topic_factory.get_config_topic()
_LOGGER.debug("Subscribing to %s", config_topic)
self._cleanup_callbacks.append(
await mqtt.async_subscribe(self._hass, config_topic, self._config_received)
)
async def async_get_or_request_config(self) -> QbusDiscovery | None:
"""Get or request Qbus config."""
_LOGGER.debug("Requesting Qbus config")
# Config already available
if self._qbus_config:
_LOGGER.debug("Qbus config already available")
return self._qbus_config
if not await async_wait_for_mqtt_client(self._hass):
_LOGGER.debug("MQTT client not ready yet")
return None
# Request config
_LOGGER.debug("Publishing config request")
await mqtt.async_publish(
self._hass, self._topic_factory.get_get_config_topic(), b""
)
return self._qbus_config
def store_config(self, config: QbusDiscovery) -> None:
"Store the Qbus config."
_LOGGER.debug("Storing config")
self._qbus_config = config
async def _config_received(self, msg: ReceiveMessage) -> None:
"""Handle the received MQTT message containing the Qbus config."""
_LOGGER.debug("Receiving Qbus config")
config = self._message_factory.parse_discovery(msg.payload)
if config is None:
_LOGGER.debug("Incomplete Qbus config")
return
self.store_config(config)
for entry in self._hass.config_entries.async_loaded_entries(DOMAIN):
entry = cast(QbusConfigEntry, entry)
await entry.runtime_data.async_update_controller_config(config)

View File

@ -0,0 +1,76 @@
"""Base class for Qbus entities."""
from abc import ABC, abstractmethod
import re
from qbusmqttapi.discovery import QbusMqttOutput
from qbusmqttapi.factory import QbusMqttMessageFactory, QbusMqttTopicFactory
from qbusmqttapi.state import QbusMqttState
from homeassistant.components.mqtt import ReceiveMessage, client as mqtt
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity import Entity
from .const import DOMAIN, MANUFACTURER
_REFID_REGEX = re.compile(r"^\d+\/(\d+(?:\/\d+)?)$")
def format_ref_id(ref_id: str) -> str | None:
"""Format the Qbus ref_id."""
matches: list[str] = re.findall(_REFID_REGEX, ref_id)
if len(matches) > 0:
if ref_id := matches[0]:
return ref_id.replace("/", "-")
return None
class QbusEntity(Entity, ABC):
"""Representation of a Qbus entity."""
_attr_has_entity_name = True
_attr_name = None
_attr_should_poll = False
def __init__(self, mqtt_output: QbusMqttOutput) -> None:
"""Initialize the Qbus entity."""
self._topic_factory = QbusMqttTopicFactory()
self._message_factory = QbusMqttMessageFactory()
ref_id = format_ref_id(mqtt_output.ref_id)
self._attr_unique_id = f"ctd_{mqtt_output.device.serial_number}_{ref_id}"
self._attr_device_info = DeviceInfo(
name=mqtt_output.name.title(),
manufacturer=MANUFACTURER,
identifiers={(DOMAIN, f"{mqtt_output.device.serial_number}_{ref_id}")},
suggested_area=mqtt_output.location.title(),
via_device=(DOMAIN, format_mac(mqtt_output.device.mac)),
)
self._mqtt_output = mqtt_output
self._state_topic = self._topic_factory.get_output_state_topic(
mqtt_output.device.id, mqtt_output.id
)
async def async_added_to_hass(self) -> None:
"""Run when entity about to be added to hass."""
self.async_on_remove(
await mqtt.async_subscribe(
self.hass, self._state_topic, self._state_received
)
)
@abstractmethod
async def _state_received(self, msg: ReceiveMessage) -> None:
pass
async def _async_publish_output_state(self, state: QbusMqttState) -> None:
request = self._message_factory.create_set_output_state_request(
self._mqtt_output.device, state
)
await mqtt.async_publish(self.hass, request.topic, request.payload)

View File

@ -0,0 +1,17 @@
{
"domain": "qbus",
"name": "Qbus",
"codeowners": ["@Qbus-iot", "@thomasddn"],
"config_flow": true,
"dependencies": ["mqtt"],
"documentation": "https://www.home-assistant.io/integrations/qbus",
"integration_type": "hub",
"iot_class": "local_push",
"mqtt": [
"cloudapp/QBUSMQTTGW/state",
"cloudapp/QBUSMQTTGW/config",
"cloudapp/QBUSMQTTGW/+/state"
],
"quality_scale": "bronze",
"requirements": ["qbusmqttapi==1.2.3"]
}

View File

@ -0,0 +1,89 @@
rules:
# Bronze
action-setup:
status: exempt
comment: |
The integration does not provide any additional actions.
appropriate-polling:
status: exempt
comment: |
The integration does not poll.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: |
The integration does not provide any additional actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup: done
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure:
status: exempt
comment: |
The integration relies solely on auto-discovery.
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: |
The integration does not provide any additional actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: No options flow.
docs-installation-parameters:
status: exempt
comment: There are no parameters.
entity-unavailable: todo
integration-owner: done
log-when-unavailable: todo
parallel-updates: done
reauthentication-flow:
status: exempt
comment: The integration does not require authentication.
test-coverage: todo
# Gold
devices: done
diagnostics: todo
discovery-update-info: done
discovery: done
docs-data-update: todo
docs-examples: todo
docs-known-limitations: todo
docs-supported-devices: todo
docs-supported-functions: done
docs-troubleshooting: todo
docs-use-cases: todo
dynamic-devices: done
entity-category: done
entity-device-class: done
entity-disabled-by-default: done
entity-translations:
status: exempt
comment: The integration uses the name of what the user configured in the closed system.
exception-translations: todo
icon-translations:
status: exempt
comment: The integration creates unknown number of entities based on what is in the closed system and does not know what each entity stands for.
reconfiguration-flow:
status: exempt
comment: The integration has no settings.
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: done
inject-websession:
status: exempt
comment: The integration does not make HTTP requests.
strict-typing: done

View File

@ -0,0 +1,19 @@
{
"config": {
"flow_title": "Controller {serial}",
"step": {
"discovery_confirm": {
"title": "Add controller",
"description": "Add controller {serial}?"
}
},
"abort": {
"already_configured": "Controller already configured",
"discovery_in_progress": "Discovery in progress",
"not_supported": "Configuration for QBUS happens through MQTT discovery. If your controller is not automatically discovered, check the prerequisites and troubleshooting section of the documention."
},
"error": {
"no_controller": "No controllers were found"
}
}
}

View File

@ -0,0 +1,83 @@
"""Support for Qbus switch."""
from typing import Any
from qbusmqttapi.discovery import QbusMqttOutput
from qbusmqttapi.state import QbusMqttOnOffState, StateType
from homeassistant.components.mqtt import ReceiveMessage
from homeassistant.components.switch import SwitchDeviceClass, SwitchEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .coordinator import QbusConfigEntry
from .entity import QbusEntity
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant, entry: QbusConfigEntry, add_entities: AddEntitiesCallback
) -> None:
"""Set up switch entities."""
coordinator = entry.runtime_data
added_outputs: list[QbusMqttOutput] = []
# Local function that calls add_entities for new entities
def _check_outputs() -> None:
added_output_ids = {k.id for k in added_outputs}
new_outputs = [
item
for item in coordinator.data
if item.type == "onoff" and item.id not in added_output_ids
]
if new_outputs:
added_outputs.extend(new_outputs)
add_entities([QbusSwitch(output) for output in new_outputs])
_check_outputs()
entry.async_on_unload(coordinator.async_add_listener(_check_outputs))
class QbusSwitch(QbusEntity, SwitchEntity):
"""Representation of a Qbus switch entity."""
_attr_device_class = SwitchDeviceClass.SWITCH
def __init__(
self,
mqtt_output: QbusMqttOutput,
) -> None:
"""Initialize switch entity."""
super().__init__(mqtt_output)
self._attr_is_on = False
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
state = QbusMqttOnOffState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_value(True)
await self._async_publish_output_state(state)
self._attr_is_on = True
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
state = QbusMqttOnOffState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_value(False)
await self._async_publish_output_state(state)
self._attr_is_on = False
async def _state_received(self, msg: ReceiveMessage) -> None:
output = self._message_factory.parse_output_state(
QbusMqttOnOffState, msg.payload
)
if output is not None:
self._attr_is_on = output.read_value()
self.async_schedule_update_ha_state()

View File

@ -489,6 +489,7 @@ FLOWS = {
"pvpc_hourly_pricing",
"pyload",
"qbittorrent",
"qbus",
"qingping",
"qnap",
"qnap_qsw",

View File

@ -4981,6 +4981,12 @@
"config_flow": true,
"iot_class": "local_polling"
},
"qbus": {
"name": "Qbus",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_push"
},
"qingping": {
"name": "Qingping",
"integration_type": "hub",

View File

@ -16,6 +16,11 @@ MQTT = {
"fully_kiosk": [
"fully/deviceInfo/+",
],
"qbus": [
"cloudapp/QBUSMQTTGW/state",
"cloudapp/QBUSMQTTGW/config",
"cloudapp/QBUSMQTTGW/+/state",
],
"tasmota": [
"tasmota/discovery/#",
],

10
mypy.ini generated
View File

@ -3606,6 +3606,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.qbus.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.qnap_qsw.*]
check_untyped_defs = true
disallow_incomplete_defs = true

3
requirements_all.txt generated
View File

@ -2556,6 +2556,9 @@ pyzerproc==0.4.8
# homeassistant.components.qbittorrent
qbittorrent-api==2024.2.59
# homeassistant.components.qbus
qbusmqttapi==1.2.3
# homeassistant.components.qingping
qingping-ble==0.10.0

View File

@ -2068,6 +2068,9 @@ pyzerproc==0.4.8
# homeassistant.components.qbittorrent
qbittorrent-api==2024.2.59
# homeassistant.components.qbus
qbusmqttapi==1.2.3
# homeassistant.components.qingping
qingping-ble==0.10.0

View File

@ -0,0 +1 @@
"""Tests for the Qbus integration."""

View File

@ -0,0 +1,33 @@
"""Test fixtures for qbus."""
import pytest
from homeassistant.components.qbus.const import CONF_SERIAL_NUMBER, DOMAIN
from homeassistant.const import CONF_ID
from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonObjectType
from .const import FIXTURE_PAYLOAD_CONFIG
from tests.common import MockConfigEntry, load_json_object_fixture
@pytest.fixture
def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry:
"""Return the default mocked config entry."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id="000001",
data={
CONF_ID: "UL1",
CONF_SERIAL_NUMBER: "000001",
},
)
config_entry.add_to_hass(hass)
return config_entry
@pytest.fixture
def payload_config() -> JsonObjectType:
"""Return the config topic payload."""
return load_json_object_fixture(FIXTURE_PAYLOAD_CONFIG, DOMAIN)

View File

@ -0,0 +1,4 @@
"""Define const for unit tests."""
FIXTURE_PAYLOAD_CONFIG = "payload_config.json"
TOPIC_CONFIG = "cloudapp/QBUSMQTTGW/config"

View File

@ -0,0 +1,49 @@
{
"app": "abc",
"devices": [
{
"id": "UL1",
"ip": "192.168.1.123",
"mac": "001122334455",
"name": "",
"serialNr": "000001",
"type": "Qbus",
"version": "3.14.0",
"properties": {
"connectable": {
"read": true,
"type": "boolean",
"write": false
},
"connected": {
"read": true,
"type": "boolean",
"write": false
}
},
"functionBlocks": [
{
"id": "UL10",
"location": "Living",
"locationId": 0,
"name": "LIVING",
"originalName": "LIVING",
"refId": "000001/10",
"type": "onoff",
"variant": [null],
"actions": {
"off": null,
"on": null
},
"properties": {
"value": {
"read": true,
"type": "boolean",
"write": true
}
}
}
]
}
]
}

View File

@ -0,0 +1,202 @@
"""Test config flow."""
import json
import time
from unittest.mock import patch
import pytest
from qbusmqttapi.discovery import QbusDiscovery
from homeassistant.components.qbus.const import CONF_SERIAL_NUMBER, DOMAIN
from homeassistant.components.qbus.coordinator import QbusConfigCoordinator
from homeassistant.config_entries import SOURCE_MQTT, SOURCE_USER
from homeassistant.const import CONF_ID
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.mqtt import MqttServiceInfo
from homeassistant.util.json import JsonObjectType
from .const import TOPIC_CONFIG
_PAYLOAD_DEVICE_STATE = '{"id":"UL1","properties":{"connected":true},"type":"event"}'
async def test_step_discovery_confirm_create_entry(
hass: HomeAssistant, payload_config: JsonObjectType
) -> None:
"""Test mqtt confirm step and entry creation."""
discovery = MqttServiceInfo(
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
topic="cloudapp/QBUSMQTTGW/UL1/state",
payload=_PAYLOAD_DEVICE_STATE,
qos=0,
retain=False,
timestamp=time.time(),
)
with (
patch.object(
QbusConfigCoordinator,
"async_get_or_request_config",
return_value=QbusDiscovery(payload_config),
),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert result.get("type") == FlowResultType.FORM
assert result.get("step_id") == "discovery_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert result.get("data") == {
CONF_ID: "UL1",
CONF_SERIAL_NUMBER: "000001",
}
assert result.get("result").unique_id == "000001"
@pytest.mark.parametrize(
("topic", "payload"),
[
("cloudapp/QBUSMQTTGW/state", b""),
("invalid/topic", b"{}"),
],
)
async def test_step_mqtt_invalid(
hass: HomeAssistant, topic: str, payload: bytes
) -> None:
"""Test mqtt discovery with empty payload."""
discovery = MqttServiceInfo(
subscribed_topic=topic,
topic=topic,
payload=payload,
qos=0,
retain=False,
timestamp=time.time(),
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "invalid_discovery_info"
@pytest.mark.parametrize(
("payload", "mqtt_publish"),
[
('{ "online": true }', True),
('{ "online": false }', False),
],
)
async def test_handle_gateway_topic_when_online(
hass: HomeAssistant, payload: str, mqtt_publish: bool
) -> None:
"""Test handling of gateway topic with payload indicating online."""
discovery = MqttServiceInfo(
subscribed_topic="cloudapp/QBUSMQTTGW/state",
topic="cloudapp/QBUSMQTTGW/state",
payload=payload,
qos=0,
retain=False,
timestamp=time.time(),
)
with (
patch("homeassistant.components.mqtt.client.async_publish") as mock_publish,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert mock_publish.called is mqtt_publish
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "discovery_in_progress"
async def test_handle_config_topic(
hass: HomeAssistant, payload_config: JsonObjectType
) -> None:
"""Test handling of config topic."""
discovery = MqttServiceInfo(
subscribed_topic=TOPIC_CONFIG,
topic=TOPIC_CONFIG,
payload=json.dumps(payload_config),
qos=0,
retain=False,
timestamp=time.time(),
)
with (
patch("homeassistant.components.mqtt.client.async_publish") as mock_publish,
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert mock_publish.called
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "discovery_in_progress"
async def test_handle_device_topic_missing_config(hass: HomeAssistant) -> None:
"""Test handling of device topic when config is missing."""
discovery = MqttServiceInfo(
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
topic="cloudapp/QBUSMQTTGW/UL1/state",
payload=_PAYLOAD_DEVICE_STATE,
qos=0,
retain=False,
timestamp=time.time(),
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "invalid_discovery_info"
async def test_handle_device_topic_device_not_found(
hass: HomeAssistant, payload_config: JsonObjectType
) -> None:
"""Test handling of device topic when device is not found."""
discovery = MqttServiceInfo(
subscribed_topic="cloudapp/QBUSMQTTGW/+/state",
topic="cloudapp/QBUSMQTTGW/UL2/state",
payload='{"id":"UL2","properties":{"connected":true},"type":"event"}',
qos=0,
retain=False,
timestamp=time.time(),
)
with patch.object(
QbusConfigCoordinator,
"async_get_or_request_config",
return_value=QbusDiscovery(payload_config),
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_MQTT}, data=discovery
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "invalid_discovery_info"
async def test_step_user_not_supported(hass: HomeAssistant) -> None:
"""Test user step, which should abort."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result.get("type") == FlowResultType.ABORT
assert result.get("reason") == "not_supported"

View File

@ -0,0 +1,84 @@
"""Test Qbus switch entities."""
import json
from homeassistant.components.switch import (
DOMAIN as SWITCH_DOMAIN,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
)
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonObjectType
from .const import TOPIC_CONFIG
from tests.common import MockConfigEntry, async_fire_mqtt_message
from tests.typing import MqttMockHAClient
_PAYLOAD_SWITCH_STATE_ON = '{"id":"UL10","properties":{"value":true},"type":"state"}'
_PAYLOAD_SWITCH_STATE_OFF = '{"id":"UL10","properties":{"value":false},"type":"state"}'
_PAYLOAD_SWITCH_SET_STATE_ON = (
'{"id": "UL10", "type": "state", "properties": {"value": true}}'
)
_PAYLOAD_SWITCH_SET_STATE_OFF = (
'{"id": "UL10", "type": "state", "properties": {"value": false}}'
)
_TOPIC_SWITCH_STATE = "cloudapp/QBUSMQTTGW/UL1/UL10/state"
_TOPIC_SWITCH_SET_STATE = "cloudapp/QBUSMQTTGW/UL1/UL10/setState"
_SWITCH_ENTITY_ID = "switch.living"
async def test_switch_turn_on_off(
hass: HomeAssistant,
mqtt_mock: MqttMockHAClient,
mock_config_entry: MockConfigEntry,
payload_config: JsonObjectType,
) -> None:
"""Test turning on and off."""
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
async_fire_mqtt_message(hass, TOPIC_CONFIG, json.dumps(payload_config))
await hass.async_block_till_done()
# Switch ON
mqtt_mock.reset_mock()
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: _SWITCH_ENTITY_ID},
blocking=True,
)
mqtt_mock.async_publish.assert_called_once_with(
_TOPIC_SWITCH_SET_STATE, _PAYLOAD_SWITCH_SET_STATE_ON, 0, False
)
# Simulate response
async_fire_mqtt_message(hass, _TOPIC_SWITCH_STATE, _PAYLOAD_SWITCH_STATE_ON)
await hass.async_block_till_done()
assert hass.states.get(_SWITCH_ENTITY_ID).state == STATE_ON
# Switch OFF
mqtt_mock.reset_mock()
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: _SWITCH_ENTITY_ID},
blocking=True,
)
mqtt_mock.async_publish.assert_called_once_with(
_TOPIC_SWITCH_SET_STATE, _PAYLOAD_SWITCH_SET_STATE_OFF, 0, False
)
# Simulate response
async_fire_mqtt_message(hass, _TOPIC_SWITCH_STATE, _PAYLOAD_SWITCH_STATE_OFF)
await hass.async_block_till_done()
assert hass.states.get(_SWITCH_ENTITY_ID).state == STATE_OFF