diff --git a/.strict-typing b/.strict-typing index 7abbfbf7051..28980c17903 100644 --- a/.strict-typing +++ b/.strict-typing @@ -41,6 +41,7 @@ homeassistant.components.persistent_notification.* homeassistant.components.proximity.* homeassistant.components.recorder.purge homeassistant.components.recorder.repack +homeassistant.components.recorder.statistics homeassistant.components.remote.* homeassistant.components.scene.* homeassistant.components.sensor.* diff --git a/homeassistant/components/history/__init__.py b/homeassistant/components/history/__init__.py index ac089bbb3b3..9b009e0fcea 100644 --- a/homeassistant/components/history/__init__.py +++ b/homeassistant/components/history/__init__.py @@ -55,7 +55,7 @@ CONFIG_SCHEMA = vol.Schema( @deprecated_function("homeassistant.components.recorder.history.get_significant_states") def get_significant_states(hass, *args, **kwargs): - """Wrap _get_significant_states with a sql session.""" + """Wrap _get_significant_states with an sql session.""" return history.get_significant_states(hass, *args, **kwargs) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 4b7709555d0..d7358e96100 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -35,12 +35,18 @@ from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER, convert_include_exclude_filter, ) -from homeassistant.helpers.event import async_track_time_interval, track_time_change +from homeassistant.helpers.event import ( + async_track_time_change, + async_track_time_interval, +) +from homeassistant.helpers.integration_platform import ( + async_process_integration_platforms, +) from homeassistant.helpers.typing import ConfigType from homeassistant.loader import bind_hass import homeassistant.util.dt as dt_util -from . import history, migration, purge +from . import history, migration, purge, statistics from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX from .models import Base, Events, RecorderRuns, States from .pool import RecorderPool @@ -56,6 +62,7 @@ from .util import ( _LOGGER = logging.getLogger(__name__) SERVICE_PURGE = "purge" +SERVICE_STATISTICS = "statistics" SERVICE_ENABLE = "enable" SERVICE_DISABLE = "disable" @@ -194,6 +201,7 @@ def run_information_with_session(session, point_in_time: datetime | None = None) async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the recorder.""" + hass.data[DOMAIN] = {} conf = config[DOMAIN] entity_filter = convert_include_exclude_filter(conf) auto_purge = conf[CONF_AUTO_PURGE] @@ -221,10 +229,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: instance.start() _async_register_services(hass, instance) history.async_setup(hass) + statistics.async_setup(hass) + await async_process_integration_platforms(hass, DOMAIN, _process_recorder_platform) return await instance.async_db_ready +async def _process_recorder_platform(hass, domain, platform): + """Process a recorder platform.""" + hass.data[DOMAIN][domain] = platform + + @callback def _async_register_services(hass, instance): """Register recorder services.""" @@ -263,6 +278,12 @@ class PurgeTask(NamedTuple): apply_filter: bool +class StatisticsTask(NamedTuple): + """An object to insert into the recorder queue to run a statistics task.""" + + start: datetime.datetime + + class WaitTask: """An object to insert into the recorder queue to tell it set the _queue_watch event.""" @@ -389,6 +410,13 @@ class Recorder(threading.Thread): self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + def do_adhoc_statistics(self, **kwargs): + """Trigger an adhoc statistics run.""" + start = kwargs.get("start") + if not start: + start = statistics.get_start_time() + self.queue.put(StatisticsTask(start)) + @callback def async_register(self, shutdown_task, hass_started): """Post connection initialize.""" @@ -451,7 +479,8 @@ class Recorder(threading.Thread): @callback def _async_recorder_ready(self): - """Mark recorder ready.""" + """Finish start and mark recorder ready.""" + self._async_setup_periodic_tasks() self.async_recorder_ready.set() @callback @@ -459,6 +488,24 @@ class Recorder(threading.Thread): """Trigger the purge.""" self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False)) + @callback + def async_hourly_statistics(self, now): + """Trigger the hourly statistics run.""" + start = statistics.get_start_time() + self.queue.put(StatisticsTask(start)) + + def _async_setup_periodic_tasks(self): + """Prepare periodic tasks.""" + if self.auto_purge: + # Purge every night at 4:12am + async_track_time_change( + self.hass, self.async_purge, hour=4, minute=12, second=0 + ) + # Compile hourly statistics every hour at *:12 + async_track_time_change( + self.hass, self.async_hourly_statistics, minute=12, second=0 + ) + def run(self): """Start processing events to save.""" shutdown_task = object() @@ -507,11 +554,6 @@ class Recorder(threading.Thread): self._shutdown() return - # Start periodic purge - if self.auto_purge: - # Purge every night at 4:12am - track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0) - _LOGGER.debug("Recorder processing the queue") self.hass.add_job(self._async_recorder_ready) self._run_event_loop() @@ -608,11 +650,21 @@ class Recorder(threading.Thread): # Schedule a new purge task if this one didn't finish self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + def _run_statistics(self, start): + """Run statistics task.""" + if statistics.compile_statistics(self, start): + return + # Schedule a new statistics task if this one didn't finish + self.queue.put(StatisticsTask(start)) + def _process_one_event(self, event): """Process one event.""" if isinstance(event, PurgeTask): self._run_purge(event.keep_days, event.repack, event.apply_filter) return + if isinstance(event, StatisticsTask): + self._run_statistics(event.start) + return if isinstance(event, WaitTask): self._queue_watch.set() return diff --git a/homeassistant/components/recorder/history.py b/homeassistant/components/recorder/history.py index 63938b6774a..6c89fef2be3 100644 --- a/homeassistant/components/recorder/history.py +++ b/homeassistant/components/recorder/history.py @@ -52,7 +52,7 @@ QUERY_STATES = [ States.last_updated, ] -HISTORY_BAKERY = "history_bakery" +HISTORY_BAKERY = "recorder_history_bakery" def async_setup(hass): diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 6f414a437c9..af22239713b 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -6,6 +6,7 @@ from sqlalchemy import ( Boolean, Column, DateTime, + Float, ForeignKey, Index, Integer, @@ -38,7 +39,15 @@ TABLE_STATES = "states" TABLE_RECORDER_RUNS = "recorder_runs" TABLE_SCHEMA_CHANGES = "schema_changes" -ALL_TABLES = [TABLE_STATES, TABLE_EVENTS, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES] +TABLE_STATISTICS = "statistics" + +ALL_TABLES = [ + TABLE_STATES, + TABLE_EVENTS, + TABLE_RECORDER_RUNS, + TABLE_SCHEMA_CHANGES, + TABLE_STATISTICS, +] DATETIME_TYPE = DateTime(timezone=True).with_variant( mysql.DATETIME(timezone=True, fsp=6), "mysql" @@ -198,6 +207,39 @@ class States(Base): # type: ignore return None +class Statistics(Base): # type: ignore + """Statistics.""" + + __table_args__ = { + "mysql_default_charset": "utf8mb4", + "mysql_collate": "utf8mb4_unicode_ci", + } + __tablename__ = TABLE_STATISTICS + id = Column(Integer, primary_key=True) + created = Column(DATETIME_TYPE, default=dt_util.utcnow) + source = Column(String(32)) + statistic_id = Column(String(255)) + start = Column(DATETIME_TYPE, index=True) + mean = Column(Float()) + min = Column(Float()) + max = Column(Float()) + + __table_args__ = ( + # Used for fetching statistics for a certain entity at a specific time + Index("ix_statistics_statistic_id_start", "statistic_id", "start"), + ) + + @staticmethod + def from_stats(source, statistic_id, start, stats): + """Create object from a statistics.""" + return Statistics( + source=source, + statistic_id=statistic_id, + start=start, + **stats, + ) + + class RecorderRuns(Base): # type: ignore """Representation of recorder run.""" diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 22202ad1bbf..62914c01de7 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -3,10 +3,8 @@ from __future__ import annotations from datetime import datetime, timedelta import logging -import time from typing import TYPE_CHECKING -from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -15,20 +13,15 @@ import homeassistant.util.dt as dt_util from .const import MAX_ROWS_TO_PURGE from .models import Events, RecorderRuns, States from .repack import repack_database -from .util import session_scope +from .util import retryable_database_job, session_scope if TYPE_CHECKING: from . import Recorder _LOGGER = logging.getLogger(__name__) -# Retry when one of the following MySQL errors occurred: -RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213) -# 1205: Lock wait timeout exceeded; try restarting transaction -# 1206: The total number of locks exceeds the lock table size -# 1213: Deadlock found when trying to get lock; try restarting transaction - +@retryable_database_job("purge") def purge_old_data( instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False ) -> bool: @@ -41,36 +34,25 @@ def purge_old_data( "Purging states and events before target %s", purge_before.isoformat(sep=" ", timespec="seconds"), ) - try: - with session_scope(session=instance.get_session()) as session: # type: ignore - # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record - event_ids = _select_event_ids_to_purge(session, purge_before) - state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) - if state_ids: - _purge_state_ids(session, state_ids) - if event_ids: - _purge_event_ids(session, event_ids) - # If states or events purging isn't processing the purge_before yet, - # return false, as we are not done yet. - _LOGGER.debug("Purging hasn't fully completed yet") - return False - if apply_filter and _purge_filtered_data(instance, session) is False: - _LOGGER.debug("Cleanup filtered data hasn't fully completed yet") - return False - _purge_old_recorder_runs(instance, session, purge_before) - if repack: - repack_database(instance) - except OperationalError as err: - if ( - instance.engine.dialect.name == "mysql" - and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS - ): - _LOGGER.info("%s; purge not completed, retrying", err.orig.args[1]) - time.sleep(instance.db_retry_wait) + + with session_scope(session=instance.get_session()) as session: # type: ignore + # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record + event_ids = _select_event_ids_to_purge(session, purge_before) + state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) + if state_ids: + _purge_state_ids(session, state_ids) + if event_ids: + _purge_event_ids(session, event_ids) + # If states or events purging isn't processing the purge_before yet, + # return false, as we are not done yet. + _LOGGER.debug("Purging hasn't fully completed yet") return False - - _LOGGER.warning("Error purging history: %s", err) - + if apply_filter and _purge_filtered_data(instance, session) is False: + _LOGGER.debug("Cleanup filtered data hasn't fully completed yet") + return False + _purge_old_recorder_runs(instance, session, purge_before) + if repack: + repack_database(instance) return True diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py new file mode 100644 index 00000000000..65bed4423c5 --- /dev/null +++ b/homeassistant/components/recorder/statistics.py @@ -0,0 +1,138 @@ +"""Statistics helper.""" +from __future__ import annotations + +from collections import defaultdict +from datetime import datetime, timedelta +from itertools import groupby +import logging +from typing import TYPE_CHECKING + +from sqlalchemy import bindparam +from sqlalchemy.ext import baked + +import homeassistant.util.dt as dt_util + +from .const import DOMAIN +from .models import Statistics, process_timestamp_to_utc_isoformat +from .util import execute, retryable_database_job, session_scope + +if TYPE_CHECKING: + from . import Recorder + +QUERY_STATISTICS = [ + Statistics.statistic_id, + Statistics.start, + Statistics.mean, + Statistics.min, + Statistics.max, +] + +STATISTICS_BAKERY = "recorder_statistics_bakery" + +_LOGGER = logging.getLogger(__name__) + + +def async_setup(hass): + """Set up the history hooks.""" + hass.data[STATISTICS_BAKERY] = baked.bakery() + + +def get_start_time() -> datetime.datetime: + """Return start time.""" + last_hour = dt_util.utcnow() - timedelta(hours=1) + start = last_hour.replace(minute=0, second=0, microsecond=0) + return start + + +@retryable_database_job("statistics") +def compile_statistics(instance: Recorder, start: datetime.datetime) -> bool: + """Compile statistics.""" + start = dt_util.as_utc(start) + end = start + timedelta(hours=1) + _LOGGER.debug( + "Compiling statistics for %s-%s", + start, + end, + ) + platform_stats = [] + for domain, platform in instance.hass.data[DOMAIN].items(): + if not hasattr(platform, "compile_statistics"): + continue + platform_stats.append(platform.compile_statistics(instance.hass, start, end)) + _LOGGER.debug( + "Statistics for %s during %s-%s: %s", domain, start, end, platform_stats[-1] + ) + + with session_scope(session=instance.get_session()) as session: # type: ignore + for stats in platform_stats: + for entity_id, stat in stats.items(): + session.add(Statistics.from_stats(DOMAIN, entity_id, start, stat)) + + return True + + +def statistics_during_period(hass, start_time, end_time=None, statistic_id=None): + """Return states changes during UTC period start_time - end_time.""" + with session_scope(hass=hass) as session: + baked_query = hass.data[STATISTICS_BAKERY]( + lambda session: session.query(*QUERY_STATISTICS) + ) + + baked_query += lambda q: q.filter(Statistics.start >= bindparam("start_time")) + + if end_time is not None: + baked_query += lambda q: q.filter(Statistics.start < bindparam("end_time")) + + if statistic_id is not None: + baked_query += lambda q: q.filter_by(statistic_id=bindparam("statistic_id")) + statistic_id = statistic_id.lower() + + baked_query += lambda q: q.order_by(Statistics.statistic_id, Statistics.start) + + stats = execute( + baked_query(session).params( + start_time=start_time, end_time=end_time, statistic_id=statistic_id + ) + ) + + statistic_ids = [statistic_id] if statistic_id is not None else None + + return _sorted_statistics_to_dict( + hass, session, stats, start_time, statistic_ids + ) + + +def _sorted_statistics_to_dict( + hass, + session, + stats, + start_time, + statistic_ids, +): + """Convert SQL results into JSON friendly data structure.""" + result = defaultdict(list) + # Set all statistic IDs to empty lists in result set to maintain the order + if statistic_ids is not None: + for stat_id in statistic_ids: + result[stat_id] = [] + + # Called in a tight loop so cache the function + # here + _process_timestamp_to_utc_isoformat = process_timestamp_to_utc_isoformat + + # Append all changes to it + for ent_id, group in groupby(stats, lambda state: state.statistic_id): + ent_results = result[ent_id] + ent_results.extend( + { + "statistic_id": db_state.statistic_id, + "start": _process_timestamp_to_utc_isoformat(db_state.start), + "mean": db_state.mean, + "min": db_state.min, + "max": db_state.max, + } + for db_state in group + ) + + # Filter out the empty lists if some states had 0 results. + return {key: val for key, val in result.items() if val} diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 9f99dc2bf45..6231f493cc2 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -4,9 +4,11 @@ from __future__ import annotations from collections.abc import Generator from contextlib import contextmanager from datetime import timedelta +import functools import logging import os import time +from typing import TYPE_CHECKING from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.orm.session import Session @@ -19,10 +21,14 @@ from .models import ( ALL_TABLES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, + TABLE_STATISTICS, RecorderRuns, process_timestamp, ) +if TYPE_CHECKING: + from . import Recorder + _LOGGER = logging.getLogger(__name__) RETRIES = 3 @@ -34,6 +40,12 @@ SQLITE3_POSTFIXES = ["", "-wal", "-shm"] # should do a check on the sqlite3 database. MAX_RESTART_TIME = timedelta(minutes=10) +# Retry when one of the following MySQL errors occurred: +RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213) +# 1205: Lock wait timeout exceeded; try restarting transaction +# 1206: The total number of locks exceeds the lock table size +# 1213: Deadlock found when trying to get lock; try restarting transaction + @contextmanager def session_scope( @@ -167,6 +179,8 @@ def basic_sanity_check(cursor): """Check tables to make sure select does not fail.""" for table in ALL_TABLES: + if table == TABLE_STATISTICS: + continue if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES): cursor.execute(f"SELECT * FROM {table};") # nosec # not injection else: @@ -270,3 +284,36 @@ def end_incomplete_runs(session, start_time): "Ended unfinished session (id=%s from %s)", run.run_id, run.start ) session.add(run) + + +def retryable_database_job(description: str): + """Try to execute a database job. + + The job should return True if it finished, and False if it needs to be rescheduled. + """ + + def decorator(job: callable): + @functools.wraps(job) + def wrapper(instance: Recorder, *args, **kwargs): + try: + return job(instance, *args, **kwargs) + except OperationalError as err: + if ( + instance.engine.dialect.name == "mysql" + and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS + ): + _LOGGER.info( + "%s; %s not completed, retrying", err.orig.args[1], description + ) + time.sleep(instance.db_retry_wait) + # Failed with retryable error + return False + + _LOGGER.warning("Error executing %s: %s", description, err) + + # Failed with permanent error + return True + + return wrapper + + return decorator diff --git a/homeassistant/components/sensor/manifest.json b/homeassistant/components/sensor/manifest.json index dc62ae3b031..163bc895975 100644 --- a/homeassistant/components/sensor/manifest.json +++ b/homeassistant/components/sensor/manifest.json @@ -3,5 +3,6 @@ "name": "Sensor", "documentation": "https://www.home-assistant.io/integrations/sensor", "codeowners": [], - "quality_scale": "internal" + "quality_scale": "internal", + "after_dependencies": ["recorder"] } diff --git a/homeassistant/components/sensor/recorder.py b/homeassistant/components/sensor/recorder.py new file mode 100644 index 00000000000..f79b830a7d7 --- /dev/null +++ b/homeassistant/components/sensor/recorder.py @@ -0,0 +1,81 @@ +"""Statistics helper for sensor.""" +from __future__ import annotations + +import datetime +import statistics + +from homeassistant.components.recorder import history +from homeassistant.components.sensor import ATTR_STATE_CLASS, STATE_CLASS_MEASUREMENT +from homeassistant.const import ATTR_DEVICE_CLASS +from homeassistant.core import HomeAssistant + +from . import DOMAIN + +DEVICE_CLASS_STATISTICS = {"temperature": {"mean", "min", "max"}} + + +def _get_entities(hass: HomeAssistant) -> list[tuple[str, str]]: + """Get (entity_id, device_class) of all sensors for which to compile statistics.""" + all_sensors = hass.states.all(DOMAIN) + entity_ids = [] + + for state in all_sensors: + device_class = state.attributes.get(ATTR_DEVICE_CLASS) + state_class = state.attributes.get(ATTR_STATE_CLASS) + if not state_class or state_class != STATE_CLASS_MEASUREMENT: + continue + if not device_class or device_class not in DEVICE_CLASS_STATISTICS: + continue + entity_ids.append((state.entity_id, device_class)) + return entity_ids + + +# Faster than try/except +# From https://stackoverflow.com/a/23639915 +def _is_number(s: str) -> bool: # pylint: disable=invalid-name + """Return True if string is a number.""" + return s.replace(".", "", 1).isdigit() + + +def compile_statistics( + hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime +) -> dict: + """Compile statistics for all entities during start-end. + + Note: This will query the database and must not be run in the event loop + """ + result: dict = {} + + entities = _get_entities(hass) + + # Get history between start and end + history_list = history.get_significant_states( # type: ignore + hass, start, end, [i[0] for i in entities] + ) + + for entity_id, device_class in entities: + wanted_statistics = DEVICE_CLASS_STATISTICS[device_class] + + if entity_id not in history_list: + continue + + entity_history = history_list[entity_id] + fstates = [float(el.state) for el in entity_history if _is_number(el.state)] + + if not fstates: + continue + + result[entity_id] = {} + + # Make calculations + if "max" in wanted_statistics: + result[entity_id]["max"] = max(fstates) + if "min" in wanted_statistics: + result[entity_id]["min"] = min(fstates) + + # Note: The average calculation will be incorrect for unevenly spaced readings, + # this needs to be improved by weighting with time between measurements + if "mean" in wanted_statistics: + result[entity_id]["mean"] = statistics.fmean(fstates) + + return result diff --git a/mypy.ini b/mypy.ini index 1506a06e839..ee53f990d78 100644 --- a/mypy.ini +++ b/mypy.ini @@ -462,6 +462,17 @@ no_implicit_optional = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.recorder.statistics] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +no_implicit_optional = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.remote.*] check_untyped_defs = true disallow_incomplete_defs = true diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index a045bb638ce..540764b2ed0 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -590,7 +590,7 @@ def run_tasks_at_time(hass, test_time): def test_auto_purge(hass_recorder): - """Test periodic purge alarm scheduling.""" + """Test periodic purge scheduling.""" hass = hass_recorder() original_tz = dt_util.DEFAULT_TIME_ZONE @@ -598,9 +598,10 @@ def test_auto_purge(hass_recorder): tz = dt_util.get_time_zone("Europe/Copenhagen") dt_util.set_default_time_zone(tz) - # Purging is schedule to happen at 4:12am every day. Exercise this behavior - # by firing alarms and advancing the clock around this time. Pick an arbitrary - # year in the future to avoid boundary conditions relative to the current date. + # Purging is scheduled to happen at 4:12am every day. Exercise this behavior by + # firing time changed events and advancing the clock around this time. Pick an + # arbitrary year in the future to avoid boundary conditions relative to the current + # date. # # The clock is started at 4:15am then advanced forward below now = dt_util.utcnow() @@ -637,6 +638,56 @@ def test_auto_purge(hass_recorder): dt_util.set_default_time_zone(original_tz) +def test_auto_statistics(hass_recorder): + """Test periodic statistics scheduling.""" + hass = hass_recorder() + + original_tz = dt_util.DEFAULT_TIME_ZONE + + tz = dt_util.get_time_zone("Europe/Copenhagen") + dt_util.set_default_time_zone(tz) + + # Statistics is scheduled to happen at *:12am every hour. Exercise this behavior by + # firing time changed events and advancing the clock around this time. Pick an + # arbitrary year in the future to avoid boundary conditions relative to the current + # date. + # + # The clock is started at 4:15am then advanced forward below + now = dt_util.utcnow() + test_time = datetime(now.year + 2, 1, 1, 4, 15, 0, tzinfo=tz) + run_tasks_at_time(hass, test_time) + + with patch( + "homeassistant.components.recorder.statistics.compile_statistics", + return_value=True, + ) as compile_statistics: + # Advance one hour, and the statistics task should run + test_time = test_time + timedelta(hours=1) + run_tasks_at_time(hass, test_time) + assert len(compile_statistics.mock_calls) == 1 + + compile_statistics.reset_mock() + + # Advance one hour, and the statistics task should run again + test_time = test_time + timedelta(hours=1) + run_tasks_at_time(hass, test_time) + assert len(compile_statistics.mock_calls) == 1 + + compile_statistics.reset_mock() + + # Advance less than one full hour. The task should not run. + test_time = test_time + timedelta(minutes=50) + run_tasks_at_time(hass, test_time) + assert len(compile_statistics.mock_calls) == 0 + + # Advance to the next hour, and the statistics task should run again + test_time = test_time + timedelta(hours=1) + run_tasks_at_time(hass, test_time) + assert len(compile_statistics.mock_calls) == 1 + + dt_util.set_default_time_zone(original_tz) + + def test_saving_sets_old_state(hass_recorder): """Test saving sets old state.""" hass = hass_recorder() diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 23164bd73f5..cdbe6e3c338 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -104,7 +104,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error( mysql_exception.orig = MagicMock(args=(1205, "retryable")) with patch( - "homeassistant.components.recorder.purge.time.sleep" + "homeassistant.components.recorder.util.time.sleep" ) as sleep_mock, patch( "homeassistant.components.recorder.purge._purge_old_recorder_runs", side_effect=[mysql_exception, None], @@ -147,7 +147,7 @@ async def test_purge_old_states_encounters_operational_error( await async_wait_recording_done_without_instance(hass) assert "retrying" not in caplog.text - assert "Error purging history" in caplog.text + assert "Error executing purge" in caplog.text async def test_purge_old_events( diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py new file mode 100644 index 00000000000..ee05cb993b9 --- /dev/null +++ b/tests/components/recorder/test_statistics.py @@ -0,0 +1,88 @@ +"""The tests for sensor recorder platform.""" +# pylint: disable=protected-access,invalid-name +from datetime import timedelta +from unittest.mock import patch, sentinel + +from homeassistant.components.recorder import history +from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat +from homeassistant.components.recorder.statistics import statistics_during_period +from homeassistant.setup import setup_component +import homeassistant.util.dt as dt_util + +from tests.components.recorder.common import wait_recording_done + + +def test_compile_hourly_statistics(hass_recorder): + """Test compiling hourly statistics.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + recorder.do_adhoc_statistics(period="hourly", start=zero) + wait_recording_done(hass) + stats = statistics_during_period(hass, zero) + assert stats == { + "sensor.test1": [ + { + "statistic_id": "sensor.test1", + "start": process_timestamp_to_utc_isoformat(zero), + "mean": 15.0, + "min": 10.0, + "max": 20.0, + } + ] + } + + +def record_states(hass): + """Record some test states. + + We inject a bunch of state updates temperature sensors. + """ + mp = "media_player.test" + sns1 = "sensor.test1" + sns2 = "sensor.test2" + sns3 = "sensor.test3" + sns1_attr = {"device_class": "temperature", "state_class": "measurement"} + sns2_attr = {"device_class": "temperature"} + sns3_attr = {} + + def set_state(entity_id, state, **kwargs): + """Set the state.""" + hass.states.set(entity_id, state, **kwargs) + wait_recording_done(hass) + return hass.states.get(entity_id) + + zero = dt_util.utcnow() + one = zero + timedelta(minutes=1) + two = one + timedelta(minutes=15) + three = two + timedelta(minutes=30) + four = three + timedelta(minutes=15) + + states = {mp: [], sns1: [], sns2: [], sns3: []} + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + states[mp].append( + set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) + ) + states[mp].append( + set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)}) + ) + states[sns1].append(set_state(sns1, "10", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "10", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "10", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + states[sns1].append(set_state(sns1, "15", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "15", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "15", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + states[sns1].append(set_state(sns1, "20", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "20", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "20", attributes=sns3_attr)) + + return zero, four, states diff --git a/tests/components/sensor/test_recorder.py b/tests/components/sensor/test_recorder.py new file mode 100644 index 00000000000..a391161ee1e --- /dev/null +++ b/tests/components/sensor/test_recorder.py @@ -0,0 +1,224 @@ +"""The tests for sensor recorder platform.""" +# pylint: disable=protected-access,invalid-name +from datetime import timedelta +from unittest.mock import patch, sentinel + +import pytest + +from homeassistant.components.recorder import history +from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat +from homeassistant.components.recorder.statistics import statistics_during_period +from homeassistant.const import STATE_UNAVAILABLE +from homeassistant.setup import setup_component +import homeassistant.util.dt as dt_util + +from tests.common import get_test_home_assistant, init_recorder_component +from tests.components.recorder.common import wait_recording_done + + +@pytest.fixture +def hass_recorder(): + """Home Assistant fixture with in-memory recorder.""" + hass = get_test_home_assistant() + + def setup_recorder(config=None): + """Set up with params.""" + init_recorder_component(hass, config) + hass.start() + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + return hass + + yield setup_recorder + hass.stop() + + +def test_compile_hourly_statistics(hass_recorder): + """Test compiling hourly statistics.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + recorder.do_adhoc_statistics(period="hourly", start=zero) + wait_recording_done(hass) + stats = statistics_during_period(hass, zero) + assert stats == { + "sensor.test1": [ + { + "statistic_id": "sensor.test1", + "start": process_timestamp_to_utc_isoformat(zero), + "mean": 15.0, + "min": 10.0, + "max": 20.0, + } + ] + } + + +def test_compile_hourly_statistics_unchanged(hass_recorder): + """Test compiling hourly statistics, with no changes during the hour.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + recorder.do_adhoc_statistics(period="hourly", start=four) + wait_recording_done(hass) + stats = statistics_during_period(hass, four) + assert stats == { + "sensor.test1": [ + { + "statistic_id": "sensor.test1", + "start": process_timestamp_to_utc_isoformat(four), + "mean": 20.0, + "min": 20.0, + "max": 20.0, + } + ] + } + + +def test_compile_hourly_statistics_partially_unavailable(hass_recorder): + """Test compiling hourly statistics, with the sensor being partially unavailable.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states_partially_unavailable(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + recorder.do_adhoc_statistics(period="hourly", start=zero) + wait_recording_done(hass) + stats = statistics_during_period(hass, zero) + assert stats == { + "sensor.test1": [ + { + "statistic_id": "sensor.test1", + "start": process_timestamp_to_utc_isoformat(zero), + "mean": 17.5, + "min": 10.0, + "max": 25.0, + } + ] + } + + +def test_compile_hourly_statistics_unavailable(hass_recorder): + """Test compiling hourly statistics, with the sensor being unavailable.""" + hass = hass_recorder() + recorder = hass.data[DATA_INSTANCE] + setup_component(hass, "sensor", {}) + zero, four, states = record_states_partially_unavailable(hass) + hist = history.get_significant_states(hass, zero, four) + assert dict(states) == dict(hist) + + recorder.do_adhoc_statistics(period="hourly", start=four) + wait_recording_done(hass) + stats = statistics_during_period(hass, four) + assert stats == {} + + +def record_states(hass): + """Record some test states. + + We inject a bunch of state updates temperature sensors. + """ + mp = "media_player.test" + sns1 = "sensor.test1" + sns2 = "sensor.test2" + sns3 = "sensor.test3" + sns1_attr = {"device_class": "temperature", "state_class": "measurement"} + sns2_attr = {"device_class": "temperature"} + sns3_attr = {} + + def set_state(entity_id, state, **kwargs): + """Set the state.""" + hass.states.set(entity_id, state, **kwargs) + wait_recording_done(hass) + return hass.states.get(entity_id) + + zero = dt_util.utcnow() + one = zero + timedelta(minutes=1) + two = one + timedelta(minutes=15) + three = two + timedelta(minutes=30) + four = three + timedelta(minutes=15) + + states = {mp: [], sns1: [], sns2: [], sns3: []} + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + states[mp].append( + set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) + ) + states[mp].append( + set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)}) + ) + states[sns1].append(set_state(sns1, "10", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "10", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "10", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + states[sns1].append(set_state(sns1, "15", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "15", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "15", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + states[sns1].append(set_state(sns1, "20", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "20", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "20", attributes=sns3_attr)) + + return zero, four, states + + +def record_states_partially_unavailable(hass): + """Record some test states. + + We inject a bunch of state updates temperature sensors. + """ + mp = "media_player.test" + sns1 = "sensor.test1" + sns2 = "sensor.test2" + sns3 = "sensor.test3" + sns1_attr = {"device_class": "temperature", "state_class": "measurement"} + sns2_attr = {"device_class": "temperature"} + sns3_attr = {} + + def set_state(entity_id, state, **kwargs): + """Set the state.""" + hass.states.set(entity_id, state, **kwargs) + wait_recording_done(hass) + return hass.states.get(entity_id) + + zero = dt_util.utcnow() + one = zero + timedelta(minutes=1) + two = one + timedelta(minutes=15) + three = two + timedelta(minutes=30) + four = three + timedelta(minutes=15) + + states = {mp: [], sns1: [], sns2: [], sns3: []} + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one): + states[mp].append( + set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) + ) + states[mp].append( + set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)}) + ) + states[sns1].append(set_state(sns1, "10", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "10", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "10", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two): + states[sns1].append(set_state(sns1, "25", attributes=sns1_attr)) + states[sns2].append(set_state(sns2, "25", attributes=sns2_attr)) + states[sns3].append(set_state(sns3, "25", attributes=sns3_attr)) + + with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three): + states[sns1].append(set_state(sns1, STATE_UNAVAILABLE, attributes=sns1_attr)) + states[sns2].append(set_state(sns2, STATE_UNAVAILABLE, attributes=sns2_attr)) + states[sns3].append(set_state(sns3, STATE_UNAVAILABLE, attributes=sns3_attr)) + + return zero, four, states