Quickly process unavailable metrics in Prometheus (#133219)

This commit is contained in:
Adam Goode 2024-12-30 06:05:33 -05:00 committed by GitHub
parent 906c95048c
commit 4db88dfaff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,8 +2,9 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable from collections.abc import Callable
from contextlib import suppress from dataclasses import astuple, dataclass
import logging import logging
import string import string
from typing import Any, cast from typing import Any, cast
@ -158,6 +159,22 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True return True
@dataclass(frozen=True, slots=True)
class MetricNameWithLabelValues:
"""Class to represent a metric with its label values.
The prometheus client library doesn't easily allow us to get back the
information we put into it. Specifically, it is very expensive to query
which label values have been set for metrics.
This class is used to hold a bit of data we need to efficiently remove
labelsets from metrics.
"""
metric_name: str
label_values: tuple[str, ...]
class PrometheusMetrics: class PrometheusMetrics:
"""Model all of the metrics which should be exposed to Prometheus.""" """Model all of the metrics which should be exposed to Prometheus."""
@ -191,6 +208,9 @@ class PrometheusMetrics:
else: else:
self.metrics_prefix = "" self.metrics_prefix = ""
self._metrics: dict[str, MetricWrapperBase] = {} self._metrics: dict[str, MetricWrapperBase] = {}
self._metrics_by_entity_id: dict[str, set[MetricNameWithLabelValues]] = (
defaultdict(set)
)
self._climate_units = climate_units self._climate_units = climate_units
def handle_state_changed_event(self, event: Event[EventStateChangedData]) -> None: def handle_state_changed_event(self, event: Event[EventStateChangedData]) -> None:
@ -202,10 +222,12 @@ class PrometheusMetrics:
_LOGGER.debug("Filtered out entity %s", state.entity_id) _LOGGER.debug("Filtered out entity %s", state.entity_id)
return return
if (old_state := event.data.get("old_state")) is not None and ( if (
old_friendly_name := old_state.attributes.get(ATTR_FRIENDLY_NAME) old_state := event.data.get("old_state")
) is not None and old_state.attributes.get(
ATTR_FRIENDLY_NAME
) != state.attributes.get(ATTR_FRIENDLY_NAME): ) != state.attributes.get(ATTR_FRIENDLY_NAME):
self._remove_labelsets(old_state.entity_id, old_friendly_name) self._remove_labelsets(old_state.entity_id)
self.handle_state(state) self.handle_state(state)
@ -215,30 +237,32 @@ class PrometheusMetrics:
_LOGGER.debug("Handling state update for %s", entity_id) _LOGGER.debug("Handling state update for %s", entity_id)
labels = self._labels(state) labels = self._labels(state)
state_change = self._metric(
"state_change", prometheus_client.Counter, "The number of state changes"
)
state_change.labels(**labels).inc()
entity_available = self._metric( self._metric(
"state_change",
prometheus_client.Counter,
"The number of state changes",
labels,
).inc()
self._metric(
"entity_available", "entity_available",
prometheus_client.Gauge, prometheus_client.Gauge,
"Entity is available (not in the unavailable or unknown state)", "Entity is available (not in the unavailable or unknown state)",
) labels,
entity_available.labels(**labels).set(float(state.state not in IGNORED_STATES)) ).set(float(state.state not in IGNORED_STATES))
last_updated_time_seconds = self._metric( self._metric(
"last_updated_time_seconds", "last_updated_time_seconds",
prometheus_client.Gauge, prometheus_client.Gauge,
"The last_updated timestamp", "The last_updated timestamp",
) labels,
last_updated_time_seconds.labels(**labels).set(state.last_updated.timestamp()) ).set(state.last_updated.timestamp())
if state.state in IGNORED_STATES: if state.state in IGNORED_STATES:
self._remove_labelsets( self._remove_labelsets(
entity_id, entity_id,
None, {"state_change", "entity_available", "last_updated_time_seconds"},
{state_change, entity_available, last_updated_time_seconds},
) )
else: else:
domain, _ = hacore.split_entity_id(entity_id) domain, _ = hacore.split_entity_id(entity_id)
@ -274,67 +298,68 @@ class PrometheusMetrics:
def _remove_labelsets( def _remove_labelsets(
self, self,
entity_id: str, entity_id: str,
friendly_name: str | None = None, ignored_metric_names: set[str] | None = None,
ignored_metrics: set[MetricWrapperBase] | None = None,
) -> None: ) -> None:
"""Remove labelsets matching the given entity id from all non-ignored metrics.""" """Remove labelsets matching the given entity id from all non-ignored metrics."""
if ignored_metrics is None: if ignored_metric_names is None:
ignored_metrics = set() ignored_metric_names = set()
for metric in list(self._metrics.values()): metric_set = self._metrics_by_entity_id[entity_id]
if metric in ignored_metrics: removed_metrics = set()
for metric in metric_set:
metric_name, label_values = astuple(metric)
if metric_name in ignored_metric_names:
continue continue
for sample in cast(list[prometheus_client.Metric], metric.collect())[
0 _LOGGER.debug(
].samples: "Removing labelset %s from %s for entity_id: %s",
if sample.labels["entity"] == entity_id and ( label_values,
not friendly_name or sample.labels["friendly_name"] == friendly_name metric_name,
): entity_id,
_LOGGER.debug( )
"Removing labelset from %s for entity_id: %s", removed_metrics.add(metric)
sample.name, self._metrics[metric_name].remove(*label_values)
entity_id, metric_set -= removed_metrics
) if not metric_set:
with suppress(KeyError): del self._metrics_by_entity_id[entity_id]
metric.remove(*sample.labels.values())
def _handle_attributes(self, state: State) -> None: def _handle_attributes(self, state: State) -> None:
for key, value in state.attributes.items(): for key, value in state.attributes.items():
metric = self._metric( try:
value = float(value)
except (ValueError, TypeError):
continue
self._metric(
f"{state.domain}_attr_{key.lower()}", f"{state.domain}_attr_{key.lower()}",
prometheus_client.Gauge, prometheus_client.Gauge,
f"{key} attribute of {state.domain} entity", f"{key} attribute of {state.domain} entity",
) self._labels(state),
).set(value)
try:
value = float(value)
metric.labels(**self._labels(state)).set(value)
except (ValueError, TypeError):
pass
def _metric[_MetricBaseT: MetricWrapperBase]( def _metric[_MetricBaseT: MetricWrapperBase](
self, self,
metric: str, metric_name: str,
factory: type[_MetricBaseT], factory: type[_MetricBaseT],
documentation: str, documentation: str,
extra_labels: list[str] | None = None, labels: dict[str, str],
) -> _MetricBaseT: ) -> _MetricBaseT:
labels = ["entity", "friendly_name", "domain"]
if extra_labels is not None:
labels.extend(extra_labels)
try: try:
return cast(_MetricBaseT, self._metrics[metric]) metric = cast(_MetricBaseT, self._metrics[metric_name])
except KeyError: except KeyError:
full_metric_name = self._sanitize_metric_name( full_metric_name = self._sanitize_metric_name(
f"{self.metrics_prefix}{metric}" f"{self.metrics_prefix}{metric_name}"
) )
self._metrics[metric] = factory( self._metrics[metric_name] = factory(
full_metric_name, full_metric_name,
documentation, documentation,
labels, labels.keys(),
registry=prometheus_client.REGISTRY, registry=prometheus_client.REGISTRY,
) )
return cast(_MetricBaseT, self._metrics[metric]) metric = cast(_MetricBaseT, self._metrics[metric_name])
self._metrics_by_entity_id[labels["entity"]].add(
MetricNameWithLabelValues(metric_name, tuple(labels.values()))
)
return metric.labels(**labels)
@staticmethod @staticmethod
def _sanitize_metric_name(metric: str) -> str: def _sanitize_metric_name(metric: str) -> str:
@ -356,67 +381,90 @@ class PrometheusMetrics:
return value return value
@staticmethod @staticmethod
def _labels(state: State) -> dict[str, Any]: def _labels(
return { state: State,
extra_labels: dict[str, str] | None = None,
) -> dict[str, Any]:
if extra_labels is None:
extra_labels = {}
labels = {
"entity": state.entity_id, "entity": state.entity_id,
"domain": state.domain, "domain": state.domain,
"friendly_name": state.attributes.get(ATTR_FRIENDLY_NAME), "friendly_name": state.attributes.get(ATTR_FRIENDLY_NAME),
} }
if not labels.keys().isdisjoint(extra_labels.keys()):
conflicting_keys = labels.keys() & extra_labels.keys()
raise ValueError(
f"extra_labels contains conflicting keys: {conflicting_keys}"
)
return labels | extra_labels
def _battery(self, state: State) -> None: def _battery(self, state: State) -> None:
if (battery_level := state.attributes.get(ATTR_BATTERY_LEVEL)) is not None: if (battery_level := state.attributes.get(ATTR_BATTERY_LEVEL)) is None:
metric = self._metric( return
"battery_level_percent",
prometheus_client.Gauge, try:
"Battery level as a percentage of its capacity", value = float(battery_level)
) except ValueError:
try: return
value = float(battery_level)
metric.labels(**self._labels(state)).set(value) self._metric(
except ValueError: "battery_level_percent",
pass prometheus_client.Gauge,
"Battery level as a percentage of its capacity",
self._labels(state),
).set(value)
def _handle_binary_sensor(self, state: State) -> None: def _handle_binary_sensor(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
self._metric(
"binary_sensor_state", "binary_sensor_state",
prometheus_client.Gauge, prometheus_client.Gauge,
"State of the binary sensor (0/1)", "State of the binary sensor (0/1)",
) self._labels(state),
if (value := self.state_as_number(state)) is not None: ).set(value)
metric.labels(**self._labels(state)).set(value)
def _handle_input_boolean(self, state: State) -> None: def _handle_input_boolean(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
self._metric(
"input_boolean_state", "input_boolean_state",
prometheus_client.Gauge, prometheus_client.Gauge,
"State of the input boolean (0/1)", "State of the input boolean (0/1)",
) self._labels(state),
if (value := self.state_as_number(state)) is not None: ).set(value)
metric.labels(**self._labels(state)).set(value)
def _numeric_handler(self, state: State, domain: str, title: str) -> None: def _numeric_handler(self, state: State, domain: str, title: str) -> None:
if (value := self.state_as_number(state)) is None:
return
if unit := self._unit_string(state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)): if unit := self._unit_string(state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)):
metric = self._metric( metric = self._metric(
f"{domain}_state_{unit}", f"{domain}_state_{unit}",
prometheus_client.Gauge, prometheus_client.Gauge,
f"State of the {title} measured in {unit}", f"State of the {title} measured in {unit}",
self._labels(state),
) )
else: else:
metric = self._metric( metric = self._metric(
f"{domain}_state", f"{domain}_state",
prometheus_client.Gauge, prometheus_client.Gauge,
f"State of the {title}", f"State of the {title}",
self._labels(state),
) )
if (value := self.state_as_number(state)) is not None: if (
if ( state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == UnitOfTemperature.FAHRENHEIT
== UnitOfTemperature.FAHRENHEIT ):
): value = TemperatureConverter.convert(
value = TemperatureConverter.convert( value, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS
value, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS )
)
metric.labels(**self._labels(state)).set(value) metric.set(value)
def _handle_input_number(self, state: State) -> None: def _handle_input_number(self, state: State) -> None:
self._numeric_handler(state, "input_number", "input number") self._numeric_handler(state, "input_number", "input number")
@ -425,88 +473,99 @@ class PrometheusMetrics:
self._numeric_handler(state, "number", "number") self._numeric_handler(state, "number", "number")
def _handle_device_tracker(self, state: State) -> None: def _handle_device_tracker(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
self._metric(
"device_tracker_state", "device_tracker_state",
prometheus_client.Gauge, prometheus_client.Gauge,
"State of the device tracker (0/1)", "State of the device tracker (0/1)",
) self._labels(state),
if (value := self.state_as_number(state)) is not None: ).set(value)
metric.labels(**self._labels(state)).set(value)
def _handle_person(self, state: State) -> None: def _handle_person(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
"person_state", prometheus_client.Gauge, "State of the person (0/1)" return
)
if (value := self.state_as_number(state)) is not None: self._metric(
metric.labels(**self._labels(state)).set(value) "person_state",
prometheus_client.Gauge,
"State of the person (0/1)",
self._labels(state),
).set(value)
def _handle_cover(self, state: State) -> None: def _handle_cover(self, state: State) -> None:
metric = self._metric(
"cover_state",
prometheus_client.Gauge,
"State of the cover (0/1)",
["state"],
)
cover_states = [STATE_CLOSED, STATE_CLOSING, STATE_OPEN, STATE_OPENING] cover_states = [STATE_CLOSED, STATE_CLOSING, STATE_OPEN, STATE_OPENING]
for cover_state in cover_states: for cover_state in cover_states:
metric.labels(**dict(self._labels(state), state=cover_state)).set( metric = self._metric(
float(cover_state == state.state) "cover_state",
prometheus_client.Gauge,
"State of the cover (0/1)",
self._labels(state, {"state": cover_state}),
) )
metric.set(float(cover_state == state.state))
position = state.attributes.get(ATTR_CURRENT_POSITION) position = state.attributes.get(ATTR_CURRENT_POSITION)
if position is not None: if position is not None:
position_metric = self._metric( self._metric(
"cover_position", "cover_position",
prometheus_client.Gauge, prometheus_client.Gauge,
"Position of the cover (0-100)", "Position of the cover (0-100)",
) self._labels(state),
position_metric.labels(**self._labels(state)).set(float(position)) ).set(float(position))
tilt_position = state.attributes.get(ATTR_CURRENT_TILT_POSITION) tilt_position = state.attributes.get(ATTR_CURRENT_TILT_POSITION)
if tilt_position is not None: if tilt_position is not None:
tilt_position_metric = self._metric( self._metric(
"cover_tilt_position", "cover_tilt_position",
prometheus_client.Gauge, prometheus_client.Gauge,
"Tilt Position of the cover (0-100)", "Tilt Position of the cover (0-100)",
) self._labels(state),
tilt_position_metric.labels(**self._labels(state)).set(float(tilt_position)) ).set(float(tilt_position))
def _handle_light(self, state: State) -> None: def _handle_light(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
brightness = state.attributes.get(ATTR_BRIGHTNESS)
if state.state == STATE_ON and brightness is not None:
value = float(brightness) / 255.0
value = value * 100
self._metric(
"light_brightness_percent", "light_brightness_percent",
prometheus_client.Gauge, prometheus_client.Gauge,
"Light brightness percentage (0..100)", "Light brightness percentage (0..100)",
) self._labels(state),
).set(value)
if (value := self.state_as_number(state)) is not None:
brightness = state.attributes.get(ATTR_BRIGHTNESS)
if state.state == STATE_ON and brightness is not None:
value = float(brightness) / 255.0
value = value * 100
metric.labels(**self._labels(state)).set(value)
def _handle_lock(self, state: State) -> None: def _handle_lock(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
"lock_state", prometheus_client.Gauge, "State of the lock (0/1)" return
)
if (value := self.state_as_number(state)) is not None: self._metric(
metric.labels(**self._labels(state)).set(value) "lock_state",
prometheus_client.Gauge,
"State of the lock (0/1)",
self._labels(state),
).set(value)
def _handle_climate_temp( def _handle_climate_temp(
self, state: State, attr: str, metric_name: str, metric_description: str self, state: State, attr: str, metric_name: str, metric_description: str
) -> None: ) -> None:
if (temp := state.attributes.get(attr)) is not None: if (temp := state.attributes.get(attr)) is None:
if self._climate_units == UnitOfTemperature.FAHRENHEIT: return
temp = TemperatureConverter.convert(
temp, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS if self._climate_units == UnitOfTemperature.FAHRENHEIT:
) temp = TemperatureConverter.convert(
metric = self._metric( temp, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS
metric_name,
prometheus_client.Gauge,
metric_description,
) )
metric.labels(**self._labels(state)).set(temp) self._metric(
metric_name,
prometheus_client.Gauge,
metric_description,
self._labels(state),
).set(temp)
def _handle_climate(self, state: State) -> None: def _handle_climate(self, state: State) -> None:
self._handle_climate_temp( self._handle_climate_temp(
@ -535,90 +594,75 @@ class PrometheusMetrics:
) )
if current_action := state.attributes.get(ATTR_HVAC_ACTION): if current_action := state.attributes.get(ATTR_HVAC_ACTION):
metric = self._metric(
"climate_action",
prometheus_client.Gauge,
"HVAC action",
["action"],
)
for action in HVACAction: for action in HVACAction:
metric.labels(**dict(self._labels(state), action=action.value)).set( self._metric(
float(action == current_action) "climate_action",
) prometheus_client.Gauge,
"HVAC action",
self._labels(state, {"action": action.value}),
).set(float(action == current_action))
current_mode = state.state current_mode = state.state
available_modes = state.attributes.get(ATTR_HVAC_MODES) available_modes = state.attributes.get(ATTR_HVAC_MODES)
if current_mode and available_modes: if current_mode and available_modes:
metric = self._metric(
"climate_mode",
prometheus_client.Gauge,
"HVAC mode",
["mode"],
)
for mode in available_modes: for mode in available_modes:
metric.labels(**dict(self._labels(state), mode=mode)).set( self._metric(
float(mode == current_mode) "climate_mode",
) prometheus_client.Gauge,
"HVAC mode",
self._labels(state, {"mode": mode}),
).set(float(mode == current_mode))
preset_mode = state.attributes.get(ATTR_PRESET_MODE) preset_mode = state.attributes.get(ATTR_PRESET_MODE)
available_preset_modes = state.attributes.get(ATTR_PRESET_MODES) available_preset_modes = state.attributes.get(ATTR_PRESET_MODES)
if preset_mode and available_preset_modes: if preset_mode and available_preset_modes:
preset_metric = self._metric(
"climate_preset_mode",
prometheus_client.Gauge,
"Preset mode enum",
["mode"],
)
for mode in available_preset_modes: for mode in available_preset_modes:
preset_metric.labels(**dict(self._labels(state), mode=mode)).set( self._metric(
float(mode == preset_mode) "climate_preset_mode",
) prometheus_client.Gauge,
"Preset mode enum",
self._labels(state, {"mode": mode}),
).set(float(mode == preset_mode))
fan_mode = state.attributes.get(ATTR_FAN_MODE) fan_mode = state.attributes.get(ATTR_FAN_MODE)
available_fan_modes = state.attributes.get(ATTR_FAN_MODES) available_fan_modes = state.attributes.get(ATTR_FAN_MODES)
if fan_mode and available_fan_modes: if fan_mode and available_fan_modes:
fan_mode_metric = self._metric(
"climate_fan_mode",
prometheus_client.Gauge,
"Fan mode enum",
["mode"],
)
for mode in available_fan_modes: for mode in available_fan_modes:
fan_mode_metric.labels(**dict(self._labels(state), mode=mode)).set( self._metric(
float(mode == fan_mode) "climate_fan_mode",
) prometheus_client.Gauge,
"Fan mode enum",
self._labels(state, {"mode": mode}),
).set(float(mode == fan_mode))
def _handle_humidifier(self, state: State) -> None: def _handle_humidifier(self, state: State) -> None:
humidifier_target_humidity_percent = state.attributes.get(ATTR_HUMIDITY) humidifier_target_humidity_percent = state.attributes.get(ATTR_HUMIDITY)
if humidifier_target_humidity_percent: if humidifier_target_humidity_percent:
metric = self._metric( self._metric(
"humidifier_target_humidity_percent", "humidifier_target_humidity_percent",
prometheus_client.Gauge, prometheus_client.Gauge,
"Target Relative Humidity", "Target Relative Humidity",
) self._labels(state),
metric.labels(**self._labels(state)).set(humidifier_target_humidity_percent) ).set(humidifier_target_humidity_percent)
metric = self._metric(
"humidifier_state",
prometheus_client.Gauge,
"State of the humidifier (0/1)",
)
if (value := self.state_as_number(state)) is not None: if (value := self.state_as_number(state)) is not None:
metric.labels(**self._labels(state)).set(value) self._metric(
"humidifier_state",
prometheus_client.Gauge,
"State of the humidifier (0/1)",
self._labels(state),
).set(value)
current_mode = state.attributes.get(ATTR_MODE) current_mode = state.attributes.get(ATTR_MODE)
available_modes = state.attributes.get(ATTR_AVAILABLE_MODES) available_modes = state.attributes.get(ATTR_AVAILABLE_MODES)
if current_mode and available_modes: if current_mode and available_modes:
metric = self._metric(
"humidifier_mode",
prometheus_client.Gauge,
"Humidifier Mode",
["mode"],
)
for mode in available_modes: for mode in available_modes:
metric.labels(**dict(self._labels(state), mode=mode)).set( self._metric(
float(mode == current_mode) "humidifier_mode",
) prometheus_client.Gauge,
"Humidifier Mode",
self._labels(state, {"mode": mode}),
).set(float(mode == current_mode))
def _handle_sensor(self, state: State) -> None: def _handle_sensor(self, state: State) -> None:
unit = self._unit_string(state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)) unit = self._unit_string(state.attributes.get(ATTR_UNIT_OF_MEASUREMENT))
@ -628,22 +672,24 @@ class PrometheusMetrics:
if metric is not None: if metric is not None:
break break
if metric is not None: if metric is not None and (value := self.state_as_number(state)) is not None:
documentation = "State of the sensor" documentation = "State of the sensor"
if unit: if unit:
documentation = f"Sensor data measured in {unit}" documentation = f"Sensor data measured in {unit}"
_metric = self._metric(metric, prometheus_client.Gauge, documentation) if (
state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if (value := self.state_as_number(state)) is not None: == UnitOfTemperature.FAHRENHEIT
if ( ):
state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) value = TemperatureConverter.convert(
== UnitOfTemperature.FAHRENHEIT value, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS
): )
value = TemperatureConverter.convert( self._metric(
value, UnitOfTemperature.FAHRENHEIT, UnitOfTemperature.CELSIUS metric,
) prometheus_client.Gauge,
_metric.labels(**self._labels(state)).set(value) documentation,
self._labels(state),
).set(value)
self._battery(state) self._battery(state)
@ -702,114 +748,107 @@ class PrometheusMetrics:
return units.get(unit, default) return units.get(unit, default)
def _handle_switch(self, state: State) -> None: def _handle_switch(self, state: State) -> None:
metric = self._metric(
"switch_state", prometheus_client.Gauge, "State of the switch (0/1)"
)
if (value := self.state_as_number(state)) is not None: if (value := self.state_as_number(state)) is not None:
metric.labels(**self._labels(state)).set(value) self._metric(
"switch_state",
prometheus_client.Gauge,
"State of the switch (0/1)",
self._labels(state),
).set(value)
self._handle_attributes(state) self._handle_attributes(state)
def _handle_fan(self, state: State) -> None: def _handle_fan(self, state: State) -> None:
metric = self._metric(
"fan_state", prometheus_client.Gauge, "State of the fan (0/1)"
)
if (value := self.state_as_number(state)) is not None: if (value := self.state_as_number(state)) is not None:
metric.labels(**self._labels(state)).set(value) self._metric(
"fan_state",
prometheus_client.Gauge,
"State of the fan (0/1)",
self._labels(state),
).set(value)
fan_speed_percent = state.attributes.get(ATTR_PERCENTAGE) fan_speed_percent = state.attributes.get(ATTR_PERCENTAGE)
if fan_speed_percent is not None: if fan_speed_percent is not None:
fan_speed_metric = self._metric( self._metric(
"fan_speed_percent", "fan_speed_percent",
prometheus_client.Gauge, prometheus_client.Gauge,
"Fan speed percent (0-100)", "Fan speed percent (0-100)",
) self._labels(state),
fan_speed_metric.labels(**self._labels(state)).set(float(fan_speed_percent)) ).set(float(fan_speed_percent))
fan_is_oscillating = state.attributes.get(ATTR_OSCILLATING) fan_is_oscillating = state.attributes.get(ATTR_OSCILLATING)
if fan_is_oscillating is not None: if fan_is_oscillating is not None:
fan_oscillating_metric = self._metric( self._metric(
"fan_is_oscillating", "fan_is_oscillating",
prometheus_client.Gauge, prometheus_client.Gauge,
"Whether the fan is oscillating (0/1)", "Whether the fan is oscillating (0/1)",
) self._labels(state),
fan_oscillating_metric.labels(**self._labels(state)).set( ).set(float(fan_is_oscillating))
float(fan_is_oscillating)
)
fan_preset_mode = state.attributes.get(ATTR_PRESET_MODE) fan_preset_mode = state.attributes.get(ATTR_PRESET_MODE)
available_modes = state.attributes.get(ATTR_PRESET_MODES) available_modes = state.attributes.get(ATTR_PRESET_MODES)
if fan_preset_mode and available_modes: if fan_preset_mode and available_modes:
fan_preset_metric = self._metric(
"fan_preset_mode",
prometheus_client.Gauge,
"Fan preset mode enum",
["mode"],
)
for mode in available_modes: for mode in available_modes:
fan_preset_metric.labels(**dict(self._labels(state), mode=mode)).set( self._metric(
float(mode == fan_preset_mode) "fan_preset_mode",
) prometheus_client.Gauge,
"Fan preset mode enum",
self._labels(state, {"mode": mode}),
).set(float(mode == fan_preset_mode))
fan_direction = state.attributes.get(ATTR_DIRECTION) fan_direction = state.attributes.get(ATTR_DIRECTION)
if fan_direction is not None: if fan_direction in {DIRECTION_FORWARD, DIRECTION_REVERSE}:
fan_direction_metric = self._metric( self._metric(
"fan_direction_reversed", "fan_direction_reversed",
prometheus_client.Gauge, prometheus_client.Gauge,
"Fan direction reversed (bool)", "Fan direction reversed (bool)",
) self._labels(state),
if fan_direction == DIRECTION_FORWARD: ).set(float(fan_direction == DIRECTION_REVERSE))
fan_direction_metric.labels(**self._labels(state)).set(0)
elif fan_direction == DIRECTION_REVERSE:
fan_direction_metric.labels(**self._labels(state)).set(1)
def _handle_zwave(self, state: State) -> None: def _handle_zwave(self, state: State) -> None:
self._battery(state) self._battery(state)
def _handle_automation(self, state: State) -> None: def _handle_automation(self, state: State) -> None:
metric = self._metric( self._metric(
"automation_triggered_count", "automation_triggered_count",
prometheus_client.Counter, prometheus_client.Counter,
"Count of times an automation has been triggered", "Count of times an automation has been triggered",
) self._labels(state),
).inc()
metric.labels(**self._labels(state)).inc()
def _handle_counter(self, state: State) -> None: def _handle_counter(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
self._metric(
"counter_value", "counter_value",
prometheus_client.Gauge, prometheus_client.Gauge,
"Value of counter entities", "Value of counter entities",
) self._labels(state),
if (value := self.state_as_number(state)) is not None: ).set(value)
metric.labels(**self._labels(state)).set(value)
def _handle_update(self, state: State) -> None: def _handle_update(self, state: State) -> None:
metric = self._metric( if (value := self.state_as_number(state)) is None:
return
self._metric(
"update_state", "update_state",
prometheus_client.Gauge, prometheus_client.Gauge,
"Update state, indicating if an update is available (0/1)", "Update state, indicating if an update is available (0/1)",
) self._labels(state),
if (value := self.state_as_number(state)) is not None: ).set(value)
metric.labels(**self._labels(state)).set(value)
def _handle_alarm_control_panel(self, state: State) -> None: def _handle_alarm_control_panel(self, state: State) -> None:
current_state = state.state current_state = state.state
if current_state: if current_state:
metric = self._metric(
"alarm_control_panel_state",
prometheus_client.Gauge,
"State of the alarm control panel (0/1)",
["state"],
)
for alarm_state in AlarmControlPanelState: for alarm_state in AlarmControlPanelState:
metric.labels(**dict(self._labels(state), state=alarm_state.value)).set( self._metric(
float(alarm_state.value == current_state) "alarm_control_panel_state",
) prometheus_client.Gauge,
"State of the alarm control panel (0/1)",
self._labels(state, {"state": alarm_state.value}),
).set(float(alarm_state.value == current_state))
class PrometheusView(HomeAssistantView): class PrometheusView(HomeAssistantView):