Cleanup unit conversion (#79197)

* Move UNIT_RATIO to BaseUnitConverter

* Make UNIT_CONVERSION private

* Remove STATISTIC_UNIT_TO_UNIT_CLASS constant

* Cleanup websocket_api

* Imrpove valid_units check
This commit is contained in:
epenet 2022-09-28 13:49:46 +02:00 committed by GitHub
parent a5c9c1880a
commit 5438552d4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 73 deletions

View File

@ -124,17 +124,6 @@ QUERY_STATISTIC_META = [
]
STATISTIC_UNIT_TO_UNIT_CLASS: dict[str | None, str] = {
DistanceConverter.NORMALIZED_UNIT: DistanceConverter.UNIT_CLASS,
EnergyConverter.NORMALIZED_UNIT: EnergyConverter.UNIT_CLASS,
MassConverter.NORMALIZED_UNIT: MassConverter.UNIT_CLASS,
PowerConverter.NORMALIZED_UNIT: PowerConverter.UNIT_CLASS,
PressureConverter.NORMALIZED_UNIT: PressureConverter.UNIT_CLASS,
SpeedConverter.NORMALIZED_UNIT: SpeedConverter.UNIT_CLASS,
TemperatureConverter.NORMALIZED_UNIT: TemperatureConverter.UNIT_CLASS,
VolumeConverter.NORMALIZED_UNIT: VolumeConverter.UNIT_CLASS,
}
STATISTIC_UNIT_TO_UNIT_CONVERTER: dict[str | None, type[BaseUnitConverter]] = {
DistanceConverter.NORMALIZED_UNIT: DistanceConverter,
EnergyConverter.NORMALIZED_UNIT: EnergyConverter,
@ -150,6 +139,13 @@ STATISTIC_UNIT_TO_UNIT_CONVERTER: dict[str | None, type[BaseUnitConverter]] = {
_LOGGER = logging.getLogger(__name__)
def _get_unit_class(unit: str | None) -> str | None:
"""Get corresponding unit class from from the normalized statistics unit."""
if converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(unit):
return converter.UNIT_CLASS
return None
def _get_statistic_to_display_unit_converter(
statistic_unit: str | None,
state_unit: str | None,
@ -168,14 +164,13 @@ def _get_statistic_to_display_unit_converter(
return no_conversion
display_unit: str | None
unit_class = STATISTIC_UNIT_TO_UNIT_CLASS[statistic_unit]
unit_class = converter.UNIT_CLASS
if requested_units and unit_class in requested_units:
display_unit = requested_units[unit_class]
else:
display_unit = state_unit
unit_converter = STATISTIC_UNIT_TO_UNIT_CONVERTER[statistic_unit]
if display_unit not in unit_converter.VALID_UNITS:
if display_unit not in converter.VALID_UNITS:
# Guard against invalid state unit in the DB
return no_conversion
@ -909,9 +904,7 @@ def list_statistic_ids(
"has_sum": meta["has_sum"],
"name": meta["name"],
"source": meta["source"],
"unit_class": STATISTIC_UNIT_TO_UNIT_CLASS.get(
meta["unit_of_measurement"]
),
"unit_class": _get_unit_class(meta["unit_of_measurement"]),
"unit_of_measurement": meta["unit_of_measurement"],
}
for _, meta in metadata.values()
@ -934,9 +927,7 @@ def list_statistic_ids(
"name": meta["name"],
"source": meta["source"],
"display_unit_of_measurement": meta["state_unit_of_measurement"],
"unit_class": STATISTIC_UNIT_TO_UNIT_CLASS.get(
meta["unit_of_measurement"]
),
"unit_class": _get_unit_class(meta["unit_of_measurement"]),
"unit_of_measurement": meta["unit_of_measurement"],
}

View File

@ -9,11 +9,6 @@ import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import messages
from homeassistant.const import (
ENERGY_KILO_WATT_HOUR,
ENERGY_MEGA_WATT_HOUR,
ENERGY_WATT_HOUR,
)
from homeassistant.core import HomeAssistant, callback, valid_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.json import JSON_DUMP
@ -31,6 +26,7 @@ from homeassistant.util.unit_conversion import (
from .const import MAX_QUEUE_BACKLOG
from .statistics import (
STATISTIC_UNIT_TO_UNIT_CONVERTER,
async_add_external_statistics,
async_change_statistics_unit,
async_import_statistics,
@ -327,25 +323,8 @@ async def ws_adjust_sum_statistics(
def valid_units(statistics_unit: str | None, display_unit: str | None) -> bool:
if statistics_unit == display_unit:
return True
if (
statistics_unit == DistanceConverter.NORMALIZED_UNIT
and display_unit in DistanceConverter.VALID_UNITS
):
return True
if statistics_unit == ENERGY_KILO_WATT_HOUR and display_unit in (
ENERGY_MEGA_WATT_HOUR,
ENERGY_WATT_HOUR,
):
return True
if (
statistics_unit == MassConverter.NORMALIZED_UNIT
and display_unit in MassConverter.VALID_UNITS
):
return True
if (
statistics_unit == VolumeConverter.NORMALIZED_UNIT
and display_unit in VolumeConverter.VALID_UNITS
):
converter = STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistics_unit)
if converter is not None and display_unit in converter.VALID_UNITS:
return True
return False

View File

@ -234,19 +234,6 @@ UNIT_CONVERTERS: dict[str, type[BaseUnitConverter]] = {
SensorDeviceClass.WEIGHT: MassConverter,
}
UNIT_RATIOS: dict[str, dict[str, float]] = {
SensorDeviceClass.DISTANCE: DistanceConverter.UNIT_CONVERSION,
SensorDeviceClass.PRESSURE: PressureConverter.UNIT_CONVERSION,
SensorDeviceClass.SPEED: SpeedConverter.UNIT_CONVERSION,
SensorDeviceClass.TEMPERATURE: {
TEMP_CELSIUS: 1.0,
TEMP_FAHRENHEIT: 1.8,
TEMP_KELVIN: 1.0,
},
SensorDeviceClass.VOLUME: VolumeConverter.UNIT_CONVERSION,
SensorDeviceClass.WEIGHT: MassConverter.UNIT_CONVERSION,
}
# mypy: disallow-any-generics
@ -456,6 +443,7 @@ class SensorEntity(Entity):
):
assert unit_of_measurement
assert native_unit_of_measurement
converter = UNIT_CONVERTERS[device_class]
value_s = str(value)
prec = len(value_s) - value_s.index(".") - 1 if "." in value_s else 0
@ -465,8 +453,9 @@ class SensorEntity(Entity):
ratio_log = max(
0,
log10(
UNIT_RATIOS[device_class][native_unit_of_measurement]
/ UNIT_RATIOS[device_class][unit_of_measurement]
converter.get_unit_ratio(
native_unit_of_measurement, unit_of_measurement
)
),
)
prec = prec + floor(ratio_log)
@ -474,7 +463,7 @@ class SensorEntity(Entity):
# Suppress ValueError (Could not convert sensor_value to float)
with suppress(ValueError):
value_f = float(value) # type: ignore[arg-type]
value_f_new = UNIT_CONVERTERS[device_class].convert(
value_f_new = converter.convert(
value_f,
native_unit_of_measurement,
unit_of_measurement,

View File

@ -17,7 +17,8 @@ from homeassistant.const import ( # pylint: disable=unused-import # noqa: F401
from .unit_conversion import PressureConverter
UNIT_CONVERSION: dict[str, float] = PressureConverter.UNIT_CONVERSION
# pylint: disable-next=protected-access
UNIT_CONVERSION: dict[str, float] = PressureConverter._UNIT_CONVERSION
VALID_UNITS = PressureConverter.VALID_UNITS

View File

@ -24,7 +24,8 @@ from .unit_conversion import ( # pylint: disable=unused-import # noqa: F401
SpeedConverter,
)
UNIT_CONVERSION = SpeedConverter.UNIT_CONVERSION
# pylint: disable-next=protected-access
UNIT_CONVERSION = SpeedConverter._UNIT_CONVERSION
VALID_UNITS = SpeedConverter.VALID_UNITS

View File

@ -93,11 +93,16 @@ class BaseUnitConverter:
def convert(cls, value: float, from_unit: str, to_unit: str) -> float:
"""Convert one unit of measurement to another."""
@classmethod
@abstractmethod
def get_unit_ratio(cls, from_unit: str, to_unit: str) -> float:
"""Get unit ratio between units of measurement."""
class BaseUnitConverterWithUnitConversion(BaseUnitConverter):
"""Define the format of a conversion utility."""
UNIT_CONVERSION: dict[str, float]
_UNIT_CONVERSION: dict[str, float]
@classmethod
def convert(cls, value: float, from_unit: str, to_unit: str) -> float:
@ -106,14 +111,14 @@ class BaseUnitConverterWithUnitConversion(BaseUnitConverter):
return value
try:
from_ratio = cls.UNIT_CONVERSION[from_unit]
from_ratio = cls._UNIT_CONVERSION[from_unit]
except KeyError as err:
raise HomeAssistantError(
UNIT_NOT_RECOGNIZED_TEMPLATE.format(from_unit, cls.UNIT_CLASS)
) from err
try:
to_ratio = cls.UNIT_CONVERSION[to_unit]
to_ratio = cls._UNIT_CONVERSION[to_unit]
except KeyError as err:
raise HomeAssistantError(
UNIT_NOT_RECOGNIZED_TEMPLATE.format(to_unit, cls.UNIT_CLASS)
@ -122,13 +127,18 @@ class BaseUnitConverterWithUnitConversion(BaseUnitConverter):
new_value = value / from_ratio
return new_value * to_ratio
@classmethod
def get_unit_ratio(cls, from_unit: str, to_unit: str) -> float:
"""Get unit ratio between units of measurement."""
return cls._UNIT_CONVERSION[from_unit] / cls._UNIT_CONVERSION[to_unit]
class DistanceConverter(BaseUnitConverterWithUnitConversion):
"""Utility to convert distance values."""
UNIT_CLASS = "distance"
NORMALIZED_UNIT = LENGTH_METERS
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
LENGTH_METERS: 1,
LENGTH_MILLIMETERS: 1 / _MM_TO_M,
LENGTH_CENTIMETERS: 1 / _CM_TO_M,
@ -155,7 +165,7 @@ class EnergyConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "energy"
NORMALIZED_UNIT = ENERGY_KILO_WATT_HOUR
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
ENERGY_WATT_HOUR: 1 * 1000,
ENERGY_KILO_WATT_HOUR: 1,
ENERGY_MEGA_WATT_HOUR: 1 / 1000,
@ -172,7 +182,7 @@ class MassConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "mass"
NORMALIZED_UNIT = MASS_GRAMS
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
MASS_MICROGRAMS: 1 * 1000 * 1000,
MASS_MILLIGRAMS: 1 * 1000,
MASS_GRAMS: 1,
@ -195,7 +205,7 @@ class PowerConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "power"
NORMALIZED_UNIT = POWER_WATT
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
POWER_WATT: 1,
POWER_KILO_WATT: 1 / 1000,
}
@ -210,7 +220,7 @@ class PressureConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "pressure"
NORMALIZED_UNIT = PRESSURE_PA
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
PRESSURE_PA: 1,
PRESSURE_HPA: 1 / 100,
PRESSURE_KPA: 1 / 1000,
@ -239,7 +249,7 @@ class SpeedConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "speed"
NORMALIZED_UNIT = SPEED_METERS_PER_SECOND
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
SPEED_FEET_PER_SECOND: 1 / _FOOT_TO_M,
SPEED_INCHES_PER_DAY: _DAYS_TO_SECS / _IN_TO_M,
SPEED_INCHES_PER_HOUR: _HRS_TO_SECS / _IN_TO_M,
@ -271,6 +281,11 @@ class TemperatureConverter(BaseUnitConverter):
TEMP_FAHRENHEIT,
TEMP_KELVIN,
}
_UNIT_RATIO = {
TEMP_CELSIUS: 1.0,
TEMP_FAHRENHEIT: 1.8,
TEMP_KELVIN: 1.0,
}
@classmethod
def convert(
@ -342,6 +357,11 @@ class TemperatureConverter(BaseUnitConverter):
return celsius
return celsius + 273.15
@classmethod
def get_unit_ratio(cls, from_unit: str, to_unit: str) -> float:
"""Get unit ratio between units of measurement."""
return cls._UNIT_RATIO[from_unit] / cls._UNIT_RATIO[to_unit]
class VolumeConverter(BaseUnitConverterWithUnitConversion):
"""Utility to convert volume values."""
@ -349,7 +369,7 @@ class VolumeConverter(BaseUnitConverterWithUnitConversion):
UNIT_CLASS = "volume"
NORMALIZED_UNIT = VOLUME_CUBIC_METERS
# Units in terms of m³
UNIT_CONVERSION: dict[str, float] = {
_UNIT_CONVERSION: dict[str, float] = {
VOLUME_LITERS: 1 / _L_TO_CUBIC_METER,
VOLUME_MILLILITERS: 1 / _ML_TO_CUBIC_METER,
VOLUME_GALLONS: 1 / _GALLON_TO_CUBIC_METER,

View File

@ -14,7 +14,8 @@ from homeassistant.const import ( # pylint: disable=unused-import # noqa: F401
from .unit_conversion import VolumeConverter
UNIT_CONVERSION = VolumeConverter.UNIT_CONVERSION
# pylint: disable-next=protected-access
UNIT_CONVERSION = VolumeConverter._UNIT_CONVERSION
VALID_UNITS = VolumeConverter.VALID_UNITS

View File

@ -162,6 +162,30 @@ def test_convert_nonnumeric_value(
converter.convert("a", from_unit, to_unit)
@pytest.mark.parametrize(
"converter,from_unit,to_unit,expected",
[
(DistanceConverter, LENGTH_KILOMETERS, LENGTH_METERS, 1 / 1000),
(EnergyConverter, ENERGY_WATT_HOUR, ENERGY_KILO_WATT_HOUR, 1000),
(PowerConverter, POWER_WATT, POWER_KILO_WATT, 1000),
(PressureConverter, PRESSURE_HPA, PRESSURE_INHG, pytest.approx(33.86389)),
(
SpeedConverter,
SPEED_KILOMETERS_PER_HOUR,
SPEED_MILES_PER_HOUR,
pytest.approx(1.609343),
),
(TemperatureConverter, TEMP_CELSIUS, TEMP_FAHRENHEIT, 1 / 1.8),
(VolumeConverter, VOLUME_GALLONS, VOLUME_LITERS, pytest.approx(0.264172)),
],
)
def test_get_unit_ratio(
converter: type[BaseUnitConverter], from_unit: str, to_unit: str, expected: float
) -> None:
"""Test unit ratio."""
assert converter.get_unit_ratio(from_unit, to_unit) == expected
@pytest.mark.parametrize(
"value,from_unit,expected,to_unit",
[