Support formatting and scaling with ZHA Metering cluster (#26201)

* Support formatting and scaling with Metering cluster

* fix lint

* run black formatter
This commit is contained in:
presslab-us 2019-08-26 23:16:54 -04:00 committed by Alexei Chetroi
parent 03cfe7247b
commit 8e5d272b5f
3 changed files with 99 additions and 10 deletions

View File

@ -8,6 +8,8 @@ import logging
import zigpy.zcl.clusters.smartenergy as smartenergy
from homeassistant.core import callback
from .. import registries
from ..channels import AttributeListeningChannel, ZigbeeChannel
from ..const import REPORT_CONFIG_DEFAULT
@ -77,6 +79,87 @@ class Metering(AttributeListeningChannel):
REPORT_CONFIG = [{"attr": "instantaneous_demand", "config": REPORT_CONFIG_DEFAULT}]
unit_of_measure_map = {
0x00: "kW",
0x01: "m³/h",
0x02: "ft³/h",
0x03: "ccf/h",
0x04: "US gal/h",
0x05: "IMP gal/h",
0x06: "BTU/h",
0x07: "l/h",
0x08: "kPa",
0x09: "kPa",
0x0A: "mcf/h",
0x0B: "unitless",
0x0C: "MJ/s",
}
def __init__(self, cluster, device):
"""Initialize Metering."""
super().__init__(cluster, device)
self._divisor = None
self._multiplier = None
self._unit_enum = None
self._format_spec = None
async def async_configure(self):
"""Configure channel."""
await self.fetch_config(False)
await super().async_configure()
async def async_initialize(self, from_cache):
"""Initialize channel."""
await self.fetch_config(True)
await super().async_initialize(from_cache)
@callback
def attribute_updated(self, attrid, value):
"""Handle attribute update from Metering cluster."""
super().attribute_updated(attrid, value * self._multiplier / self._divisor)
@property
def unit_of_measurement(self):
"""Return unit of measurement."""
return self.unit_of_measure_map.get(self._unit_enum & 0x7F, "unknown")
async def fetch_config(self, from_cache):
"""Fetch config from device and updates format specifier."""
self._divisor = await self.get_attribute_value("divisor", from_cache=from_cache)
self._multiplier = await self.get_attribute_value(
"multiplier", from_cache=from_cache
)
self._unit_enum = await self.get_attribute_value(
"unit_of_measure", from_cache=from_cache
)
fmting = await self.get_attribute_value(
"demand_formatting", from_cache=from_cache
)
if self._divisor is None or self._divisor == 0:
self._divisor = 1
if self._multiplier is None or self._multiplier == 0:
self._multiplier = 1
if self._unit_enum is None:
self._unit_enum = 0x7F # unknown
if fmting is None:
fmting = 0xF9 # 1 digit to the right, 15 digits to the left
r_digits = fmting & 0x07 # digits to the right of decimal point
l_digits = (fmting >> 3) & 0x0F # digits to the left of decimal point
if l_digits == 0:
l_digits = 15
width = r_digits + l_digits + (1 if r_digits > 0 else 0)
if fmting & 0x80:
self._format_spec = "{:" + str(width) + "." + str(r_digits) + "f}"
else:
self._format_spec = "{:0" + str(width) + "." + str(r_digits) + "f}"
def formatter_function(self, value):
"""Return formatted value for display."""
return self._format_spec.format(value).lstrip()
@registries.ZIGBEE_CHANNEL_REGISTRY.register(smartenergy.Prepayment.cluster_id)
class Prepayment(ZigbeeChannel):

View File

@ -136,7 +136,6 @@ UNIT_REGISTRY = {
SENSOR_TEMPERATURE: TEMP_CELSIUS,
SENSOR_PRESSURE: "hPa",
SENSOR_ILLUMINANCE: "lx",
SENSOR_METERING: POWER_WATT,
SENSOR_ELECTRICAL_MEASUREMENT: POWER_WATT,
SENSOR_GENERIC: None,
SENSOR_BATTERY: "%",
@ -219,15 +218,19 @@ class Sensor(ZhaEntity):
"""Init this sensor."""
super().__init__(unique_id, zha_device, channels, **kwargs)
self._sensor_type = kwargs.get(SENSOR_TYPE, SENSOR_GENERIC)
self._unit = UNIT_REGISTRY.get(self._sensor_type)
self._formatter_function = FORMATTER_FUNC_REGISTRY.get(
self._sensor_type, pass_through_formatter
)
self._force_update = FORCE_UPDATE_REGISTRY.get(self._sensor_type, False)
self._should_poll = POLLING_REGISTRY.get(self._sensor_type, False)
self._channel = self.cluster_channels.get(
CHANNEL_REGISTRY.get(self._sensor_type, CHANNEL_ATTRIBUTE)
)
if self._sensor_type == SENSOR_METERING:
self._unit = self._channel.unit_of_measurement
self._formatter_function = self._channel.formatter_function
else:
self._unit = UNIT_REGISTRY.get(self._sensor_type)
self._formatter_function = FORMATTER_FUNC_REGISTRY.get(
self._sensor_type, pass_through_formatter
)
self._force_update = FORCE_UPDATE_REGISTRY.get(self._sensor_type, False)
self._should_poll = POLLING_REGISTRY.get(self._sensor_type, False)
self._device_class = DEVICE_CLASS_REGISTRY.get(self._sensor_type, None)
self.state_attr_provider = DEVICE_STATE_ATTR_PROVIDER_REGISTRY.get(
self._sensor_type, None
@ -271,7 +274,10 @@ class Sensor(ZhaEntity):
# this is necessary because HA saves the unit based on what shows in
# the UI and not based on what the sensor has configured so we need
# to flip it back after state restoration
self._unit = UNIT_REGISTRY.get(self._sensor_type)
if self._sensor_type == SENSOR_METERING:
self._unit = self._channel.unit_of_measurement
else:
self._unit = UNIT_REGISTRY.get(self._sensor_type)
self._state = self._formatter_function(state)
self.async_schedule_update_ha_state()

View File

@ -160,8 +160,8 @@ async def async_test_illuminance(hass, device_info):
async def async_test_metering(hass, device_info):
"""Test metering sensor."""
await send_attribute_report(hass, device_info["cluster"], 1024, 10)
assert_state(hass, device_info, "10", "W")
await send_attribute_report(hass, device_info["cluster"], 1024, 12345)
assert_state(hass, device_info, "12345.0", "unknown")
async def async_test_electrical_measurement(hass, device_info):