Separate recorder database schema from other classes (#72977)

* Separate recorder database schema from other classes

* fix logbook imports

* migrate new tests

* few more

* last one

* fix merge

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Erik Montnemery 2022-06-07 14:41:43 +02:00 committed by GitHub
parent ab82f71b43
commit 5f2b4001f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 771 additions and 733 deletions

View File

@ -8,7 +8,11 @@ from sqlalchemy.orm import Query
from sqlalchemy.sql.elements import ClauseList
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components.recorder.models import LAST_UPDATED_INDEX, Events, States
from homeassistant.components.recorder.db_schema import (
LAST_UPDATED_INDEX,
Events,
States,
)
from .common import (
apply_states_filters,

View File

@ -10,8 +10,7 @@ from sqlalchemy.sql.elements import ClauseList
from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.selectable import Select
from homeassistant.components.recorder.filters import like_domain_matchers
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
EVENTS_CONTEXT_ID_INDEX,
OLD_FORMAT_ATTRS_JSON,
OLD_STATE,
@ -22,6 +21,7 @@ from homeassistant.components.recorder.models import (
StateAttributes,
States,
)
from homeassistant.components.recorder.filters import like_domain_matchers
from ..const import ALWAYS_CONTINUOUS_DOMAINS, CONDITIONALLY_CONTINUOUS_DOMAINS

View File

@ -10,7 +10,7 @@ from sqlalchemy.sql.elements import ClauseList
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import CTE, CompoundSelect
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
DEVICE_ID_IN_EVENT,
EventData,
Events,

View File

@ -10,7 +10,7 @@ from sqlalchemy.orm import Query
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import CTE, CompoundSelect
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
ENTITY_ID_IN_EVENT,
ENTITY_ID_LAST_UPDATED_INDEX,
OLD_ENTITY_ID_IN_EVENT,

View File

@ -10,7 +10,7 @@ from sqlalchemy.orm import Query
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import CTE, CompoundSelect
from homeassistant.components.recorder.models import EventData, Events, States
from homeassistant.components.recorder.db_schema import EventData, Events, States
from .common import (
apply_events_context_hints,

View File

@ -48,17 +48,19 @@ from .const import (
SQLITE_URL_PREFIX,
SupportedDialect,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .models import (
from .db_schema import (
SCHEMA_VERSION,
Base,
EventData,
Events,
StateAttributes,
States,
StatisticsRuns,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .models import (
StatisticData,
StatisticMetaData,
StatisticsRuns,
UnsupportedDialect,
process_timestamp,
)

View File

@ -0,0 +1,600 @@
"""Models for SQLAlchemy."""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
import json
import logging
from typing import Any, cast
import ciso8601
from fnvhash import fnv1a_32
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Identity,
Index,
Integer,
SmallInteger,
String,
Text,
distinct,
type_coerce,
)
from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import aliased, declarative_base, relationship
from sqlalchemy.orm.session import Session
from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
import homeassistant.util.dt as dt_util
from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP
from .models import StatisticData, StatisticMetaData, process_timestamp
# SQLAlchemy Schema
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 29
_LOGGER = logging.getLogger(__name__)
TABLE_EVENTS = "events"
TABLE_EVENT_DATA = "event_data"
TABLE_STATES = "states"
TABLE_STATE_ATTRIBUTES = "state_attributes"
TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics"
TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
ALL_TABLES = [
TABLE_STATES,
TABLE_STATE_ATTRIBUTES,
TABLE_EVENTS,
TABLE_EVENT_DATA,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
]
TABLES_TO_CHECK = [
TABLE_STATES,
TABLE_EVENTS,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
]
LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): # type: ignore[misc]
"""Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex."""
def result_processor(self, dialect, coltype): # type: ignore[no-untyped-def]
"""Offload the datetime parsing to ciso8601."""
return lambda value: None if value is None else ciso8601.parse_datetime(value)
JSON_VARIENT_CAST = Text().with_variant(
postgresql.JSON(none_as_null=True), "postgresql"
)
JSONB_VARIENT_CAST = Text().with_variant(
postgresql.JSONB(none_as_null=True), "postgresql"
)
DATETIME_TYPE = (
DateTime(timezone=True)
.with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql")
.with_variant(FAST_PYSQLITE_DATETIME(), "sqlite")
)
DOUBLE_TYPE = (
Float()
.with_variant(mysql.DOUBLE(asdecimal=False), "mysql")
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
class JSONLiteral(JSON): # type: ignore[misc]
"""Teach SA how to literalize json."""
def literal_processor(self, dialect: str) -> Callable[[Any], str]:
"""Processor to convert a value to JSON."""
def process(value: Any) -> str:
"""Dump json."""
return json.dumps(value)
return process
EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)}
class Events(Base): # type: ignore[misc,valid-type]
"""Event history data."""
__table_args__ = (
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
event_id = Column(Integer, Identity(), primary_key=True)
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
origin_idx = Column(SmallInteger)
time_fired = Column(DATETIME_TYPE, index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
event_data_rel = relationship("EventData")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', "
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired}'"
f", data_id={self.data_id})>"
)
@staticmethod
def from_event(event: Event) -> Events:
"""Create an event database object from a native event."""
return Events(
event_type=event.event_type,
event_data=None,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=event.time_fired,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
)
def to_native(self, validate_entity_id: bool = True) -> Event | None:
"""Convert to a native HA Event."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
return Event(
self.event_type,
json.loads(self.event_data) if self.event_data else {},
EventOrigin(self.origin)
if self.origin
else EVENT_ORIGIN_ORDER[self.origin_idx],
process_timestamp(self.time_fired),
context=context,
)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting to event: %s", self)
return None
class EventData(Base): # type: ignore[misc,valid-type]
"""Event data history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENT_DATA
data_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.EventData("
f"id={self.data_id}, hash='{self.hash}', data='{self.shared_data}'"
f")>"
)
@staticmethod
def from_event(event: Event) -> EventData:
"""Create object from an event."""
shared_data = JSON_DUMP(event.data)
return EventData(
shared_data=shared_data, hash=EventData.hash_shared_data(shared_data)
)
@staticmethod
def shared_data_from_event(event: Event) -> str:
"""Create shared_attrs from an event."""
return JSON_DUMP(event.data)
@staticmethod
def hash_shared_data(shared_data: str) -> int:
"""Return the hash of json encoded shared data."""
return cast(int, fnv1a_32(shared_data.encode("utf-8")))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json.loads(self.shared_data))
except ValueError:
_LOGGER.exception("Error converting row to event data: %s", self)
return {}
class States(Base): # type: ignore[misc,valid-type]
"""State change history."""
__table_args__ = (
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
state_id = Column(Integer, Identity(), primary_key=True)
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
state = Column(String(MAX_LENGTH_STATE_STATE))
attributes = Column(
Text().with_variant(mysql.LONGTEXT, "mysql")
) # no longer used for new rows
event_id = Column( # no longer used for new rows
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
origin_idx = Column(SmallInteger) # 0 is local, 1 is remote
old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.States("
f"id={self.state_id}, entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}, attributes_id={self.attributes_id}"
f")>"
)
@staticmethod
def from_event(event: Event) -> States:
"""Create object from a state_changed event."""
entity_id = event.data["entity_id"]
state: State | None = event.data.get("new_state")
dbstate = States(
entity_id=entity_id,
attributes=None,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
)
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
return dbstate
dbstate.state = state.state
dbstate.last_updated = state.last_updated
if state.last_updated == state.last_changed:
dbstate.last_changed = None
else:
dbstate.last_changed = state.last_changed
return dbstate
def to_native(self, validate_entity_id: bool = True) -> State | None:
"""Convert to an HA state object."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
attrs = json.loads(self.attributes) if self.attributes else {}
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
attrs,
last_changed,
last_updated,
context=context,
validate_entity_id=validate_entity_id,
)
class StateAttributes(Base): # type: ignore[misc,valid-type]
"""State attribute change history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATE_ATTRIBUTES
attributes_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_attrs = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StateAttributes("
f"id={self.attributes_id}, hash='{self.hash}', attributes='{self.shared_attrs}'"
f")>"
)
@staticmethod
def from_event(event: Event) -> StateAttributes:
"""Create object from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
dbstate = StateAttributes(
shared_attrs="{}" if state is None else JSON_DUMP(state.attributes)
)
dbstate.hash = StateAttributes.hash_shared_attrs(dbstate.shared_attrs)
return dbstate
@staticmethod
def shared_attrs_from_event(
event: Event, exclude_attrs_by_domain: dict[str, set[str]]
) -> str:
"""Create shared_attrs from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
if state is None:
return "{}"
domain = split_entity_id(state.entity_id)[0]
exclude_attrs = (
exclude_attrs_by_domain.get(domain, set()) | ALL_DOMAIN_EXCLUDE_ATTRS
)
return JSON_DUMP(
{k: v for k, v in state.attributes.items() if k not in exclude_attrs}
)
@staticmethod
def hash_shared_attrs(shared_attrs: str) -> int:
"""Return the hash of json encoded shared attributes."""
return cast(int, fnv1a_32(shared_attrs.encode("utf-8")))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json.loads(self.shared_attrs))
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state attributes: %s", self)
return {}
class StatisticsBase:
"""Statistics base class."""
id = Column(Integer, Identity(), primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
@declared_attr # type: ignore[misc]
def metadata_id(self) -> Column:
"""Define the metadata_id column for sub classes."""
return Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True)
mean = Column(DOUBLE_TYPE)
min = Column(DOUBLE_TYPE)
max = Column(DOUBLE_TYPE)
last_reset = Column(DATETIME_TYPE)
state = Column(DOUBLE_TYPE)
sum = Column(DOUBLE_TYPE)
@classmethod
def from_stats(cls, metadata_id: int, stats: StatisticData) -> StatisticsBase:
"""Create object from a statistics."""
return cls( # type: ignore[call-arg,misc]
metadata_id=metadata_id,
**stats,
)
class Statistics(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Long term statistics."""
duration = timedelta(hours=1)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start", unique=True),
)
__tablename__ = TABLE_STATISTICS
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Short term statistics."""
duration = timedelta(minutes=5)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index(
"ix_statistics_short_term_statistic_id_start",
"metadata_id",
"start",
unique=True,
),
)
__tablename__ = TABLE_STATISTICS_SHORT_TERM
class StatisticsMeta(Base): # type: ignore[misc,valid-type]
"""Statistics meta data."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATISTICS_META
id = Column(Integer, Identity(), primary_key=True)
statistic_id = Column(String(255), index=True, unique=True)
source = Column(String(32))
unit_of_measurement = Column(String(255))
has_mean = Column(Boolean)
has_sum = Column(Boolean)
name = Column(String(255))
@staticmethod
def from_meta(meta: StatisticMetaData) -> StatisticsMeta:
"""Create object from meta data."""
return StatisticsMeta(**meta)
class RecorderRuns(Base): # type: ignore[misc,valid-type]
"""Representation of recorder run."""
__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)
__tablename__ = TABLE_RECORDER_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DateTime(timezone=True), default=dt_util.utcnow)
end = Column(DateTime(timezone=True))
closed_incorrect = Column(Boolean, default=False)
created = Column(DateTime(timezone=True), default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
end = (
f"'{self.end.isoformat(sep=' ', timespec='seconds')}'" if self.end else None
)
return (
f"<recorder.RecorderRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep=' ', timespec='seconds')}', "
f"end={end}, closed_incorrect={self.closed_incorrect}, "
f"created='{self.created.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)
def entity_ids(self, point_in_time: datetime | None = None) -> list[str]:
"""Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
session = Session.object_session(self)
assert session is not None, "RecorderRuns need to be persisted"
query = session.query(distinct(States.entity_id)).filter(
States.last_updated >= self.start
)
if point_in_time is not None:
query = query.filter(States.last_updated < point_in_time)
elif self.end is not None:
query = query.filter(States.last_updated < self.end)
return [row[0] for row in query]
def to_native(self, validate_entity_id: bool = True) -> RecorderRuns:
"""Return self, native format is this model."""
return self
class SchemaChanges(Base): # type: ignore[misc,valid-type]
"""Representation of schema version changes."""
__tablename__ = TABLE_SCHEMA_CHANGES
change_id = Column(Integer, Identity(), primary_key=True)
schema_version = Column(Integer)
changed = Column(DateTime(timezone=True), default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.SchemaChanges("
f"id={self.change_id}, schema_version={self.schema_version}, "
f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)
class StatisticsRuns(Base): # type: ignore[misc,valid-type]
"""Representation of statistics run."""
__tablename__ = TABLE_STATISTICS_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DateTime(timezone=True), index=True)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StatisticsRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep=' ', timespec='seconds')}', "
f")>"
)
EVENT_DATA_JSON = type_coerce(
EventData.shared_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True)
)
OLD_FORMAT_EVENT_DATA_JSON = type_coerce(
Events.event_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True)
)
SHARED_ATTRS_JSON = type_coerce(
StateAttributes.shared_attrs.cast(JSON_VARIENT_CAST), JSON(none_as_null=True)
)
OLD_FORMAT_ATTRS_JSON = type_coerce(
States.attributes.cast(JSON_VARIENT_CAST), JSON(none_as_null=True)
)
ENTITY_ID_IN_EVENT: Column = EVENT_DATA_JSON["entity_id"]
OLD_ENTITY_ID_IN_EVENT: Column = OLD_FORMAT_EVENT_DATA_JSON["entity_id"]
DEVICE_ID_IN_EVENT: Column = EVENT_DATA_JSON["device_id"]
OLD_STATE = aliased(States, name="old_state")

View File

@ -12,7 +12,7 @@ from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_
from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
from homeassistant.helpers.typing import ConfigType
from .models import ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT, States
from .db_schema import ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT, States
DOMAIN = "history"
HISTORY_FILTERS = "history_filters"

View File

@ -25,12 +25,10 @@ from homeassistant.components.websocket_api.const import (
from homeassistant.core import HomeAssistant, State, split_entity_id
import homeassistant.util.dt as dt_util
from .db_schema import RecorderRuns, StateAttributes, States
from .filters import Filters
from .models import (
LazyState,
RecorderRuns,
StateAttributes,
States,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,

View File

@ -22,7 +22,7 @@ from sqlalchemy.sql.expression import true
from homeassistant.core import HomeAssistant
from .const import SupportedDialect
from .models import (
from .db_schema import (
SCHEMA_VERSION,
TABLE_STATES,
Base,
@ -31,8 +31,8 @@ from .models import (
StatisticsMeta,
StatisticsRuns,
StatisticsShortTerm,
process_timestamp,
)
from .models import process_timestamp
from .statistics import (
delete_statistics_duplicates,
delete_statistics_meta_duplicates,

View File

@ -1,36 +1,12 @@
"""Models for SQLAlchemy."""
"""Models for Recorder."""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
from datetime import datetime
import json
import logging
from typing import Any, TypedDict, cast, overload
from typing import Any, TypedDict, overload
import ciso8601
from fnvhash import fnv1a_32
from sqlalchemy import (
JSON,
BigInteger,
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Identity,
Index,
Integer,
SmallInteger,
String,
Text,
distinct,
type_coerce,
)
from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite
from sqlalchemy.engine.row import Row
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import aliased, declarative_base, relationship
from sqlalchemy.orm.session import Session
from homeassistant.components.websocket_api.const import (
COMPRESSED_STATE_ATTRIBUTES,
@ -38,396 +14,22 @@ from homeassistant.components.websocket_api.const import (
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP
# SQLAlchemy Schema
# pylint: disable=invalid-name
Base = declarative_base()
SCHEMA_VERSION = 29
_LOGGER = logging.getLogger(__name__)
DB_TIMEZONE = "+00:00"
TABLE_EVENTS = "events"
TABLE_EVENT_DATA = "event_data"
TABLE_STATES = "states"
TABLE_STATE_ATTRIBUTES = "state_attributes"
TABLE_RECORDER_RUNS = "recorder_runs"
TABLE_SCHEMA_CHANGES = "schema_changes"
TABLE_STATISTICS = "statistics"
TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
ALL_TABLES = [
TABLE_STATES,
TABLE_STATE_ATTRIBUTES,
TABLE_EVENTS,
TABLE_EVENT_DATA,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
TABLE_STATISTICS_RUNS,
TABLE_STATISTICS_SHORT_TERM,
]
TABLES_TO_CHECK = [
TABLE_STATES,
TABLE_EVENTS,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
]
LAST_UPDATED_INDEX = "ix_states_last_updated"
ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated"
EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id"
STATES_CONTEXT_ID_INDEX = "ix_states_context_id"
EMPTY_JSON_OBJECT = "{}"
class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): # type: ignore[misc]
"""Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex."""
def result_processor(self, dialect, coltype): # type: ignore[no-untyped-def]
"""Offload the datetime parsing to ciso8601."""
return lambda value: None if value is None else ciso8601.parse_datetime(value)
JSON_VARIENT_CAST = Text().with_variant(
postgresql.JSON(none_as_null=True), "postgresql"
)
JSONB_VARIENT_CAST = Text().with_variant(
postgresql.JSONB(none_as_null=True), "postgresql"
)
DATETIME_TYPE = (
DateTime(timezone=True)
.with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql")
.with_variant(FAST_PYSQLITE_DATETIME(), "sqlite")
)
DOUBLE_TYPE = (
Float()
.with_variant(mysql.DOUBLE(asdecimal=False), "mysql")
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
class JSONLiteral(JSON): # type: ignore[misc]
"""Teach SA how to literalize json."""
def literal_processor(self, dialect: str) -> Callable[[Any], str]:
"""Processor to convert a value to JSON."""
def process(value: Any) -> str:
"""Dump json."""
return json.dumps(value)
return process
EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)}
class UnsupportedDialect(Exception):
"""The dialect or its version is not supported."""
class Events(Base): # type: ignore[misc,valid-type]
"""Event history data."""
__table_args__ = (
# Used for fetching events at a specific time
# see logbook
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
event_id = Column(Integer, Identity(), primary_key=True)
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows
origin_idx = Column(SmallInteger)
time_fired = Column(DATETIME_TYPE, index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
event_data_rel = relationship("EventData")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', "
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired}'"
f", data_id={self.data_id})>"
)
@staticmethod
def from_event(event: Event) -> Events:
"""Create an event database object from a native event."""
return Events(
event_type=event.event_type,
event_data=None,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=event.time_fired,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
)
def to_native(self, validate_entity_id: bool = True) -> Event | None:
"""Convert to a native HA Event."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
return Event(
self.event_type,
json.loads(self.event_data) if self.event_data else {},
EventOrigin(self.origin)
if self.origin
else EVENT_ORIGIN_ORDER[self.origin_idx],
process_timestamp(self.time_fired),
context=context,
)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting to event: %s", self)
return None
class EventData(Base): # type: ignore[misc,valid-type]
"""Event data history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENT_DATA
data_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.EventData("
f"id={self.data_id}, hash='{self.hash}', data='{self.shared_data}'"
f")>"
)
@staticmethod
def from_event(event: Event) -> EventData:
"""Create object from an event."""
shared_data = JSON_DUMP(event.data)
return EventData(
shared_data=shared_data, hash=EventData.hash_shared_data(shared_data)
)
@staticmethod
def shared_data_from_event(event: Event) -> str:
"""Create shared_attrs from an event."""
return JSON_DUMP(event.data)
@staticmethod
def hash_shared_data(shared_data: str) -> int:
"""Return the hash of json encoded shared data."""
return cast(int, fnv1a_32(shared_data.encode("utf-8")))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json.loads(self.shared_data))
except ValueError:
_LOGGER.exception("Error converting row to event data: %s", self)
return {}
class States(Base): # type: ignore[misc,valid-type]
"""State change history."""
__table_args__ = (
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"),
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATES
state_id = Column(Integer, Identity(), primary_key=True)
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
state = Column(String(MAX_LENGTH_STATE_STATE))
attributes = Column(
Text().with_variant(mysql.LONGTEXT, "mysql")
) # no longer used for new rows
event_id = Column( # no longer used for new rows
Integer, ForeignKey("events.event_id", ondelete="CASCADE"), index=True
)
last_changed = Column(DATETIME_TYPE)
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
origin_idx = Column(SmallInteger) # 0 is local, 1 is remote
old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.States("
f"id={self.state_id}, entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}, attributes_id={self.attributes_id}"
f")>"
)
@staticmethod
def from_event(event: Event) -> States:
"""Create object from a state_changed event."""
entity_id = event.data["entity_id"]
state: State | None = event.data.get("new_state")
dbstate = States(
entity_id=entity_id,
attributes=None,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
)
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.last_updated = event.time_fired
dbstate.last_changed = None
return dbstate
dbstate.state = state.state
dbstate.last_updated = state.last_updated
if state.last_updated == state.last_changed:
dbstate.last_changed = None
else:
dbstate.last_changed = state.last_changed
return dbstate
def to_native(self, validate_entity_id: bool = True) -> State | None:
"""Convert to an HA state object."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
attrs = json.loads(self.attributes) if self.attributes else {}
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
if self.last_changed is None or self.last_changed == self.last_updated:
last_changed = last_updated = process_timestamp(self.last_updated)
else:
last_updated = process_timestamp(self.last_updated)
last_changed = process_timestamp(self.last_changed)
return State(
self.entity_id,
self.state,
# Join the state_attributes table on attributes_id to get the attributes
# for newer states
attrs,
last_changed,
last_updated,
context=context,
validate_entity_id=validate_entity_id,
)
class StateAttributes(Base): # type: ignore[misc,valid-type]
"""State attribute change history."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATE_ATTRIBUTES
attributes_id = Column(Integer, Identity(), primary_key=True)
hash = Column(BigInteger, index=True)
# Note that this is not named attributes to avoid confusion with the states table
shared_attrs = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StateAttributes("
f"id={self.attributes_id}, hash='{self.hash}', attributes='{self.shared_attrs}'"
f")>"
)
@staticmethod
def from_event(event: Event) -> StateAttributes:
"""Create object from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
dbstate = StateAttributes(
shared_attrs="{}" if state is None else JSON_DUMP(state.attributes)
)
dbstate.hash = StateAttributes.hash_shared_attrs(dbstate.shared_attrs)
return dbstate
@staticmethod
def shared_attrs_from_event(
event: Event, exclude_attrs_by_domain: dict[str, set[str]]
) -> str:
"""Create shared_attrs from a state_changed event."""
state: State | None = event.data.get("new_state")
# None state means the state was removed from the state machine
if state is None:
return "{}"
domain = split_entity_id(state.entity_id)[0]
exclude_attrs = (
exclude_attrs_by_domain.get(domain, set()) | ALL_DOMAIN_EXCLUDE_ATTRS
)
return JSON_DUMP(
{k: v for k, v in state.attributes.items() if k not in exclude_attrs}
)
@staticmethod
def hash_shared_attrs(shared_attrs: str) -> int:
"""Return the hash of json encoded shared attributes."""
return cast(int, fnv1a_32(shared_attrs.encode("utf-8")))
def to_native(self) -> dict[str, Any]:
"""Convert to an HA state object."""
try:
return cast(dict[str, Any], json.loads(self.shared_attrs))
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state attributes: %s", self)
return {}
class StatisticResult(TypedDict):
"""Statistic result data class.
@ -455,67 +57,6 @@ class StatisticData(StatisticDataBase, total=False):
sum: float
class StatisticsBase:
"""Statistics base class."""
id = Column(Integer, Identity(), primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
@declared_attr # type: ignore[misc]
def metadata_id(self) -> Column:
"""Define the metadata_id column for sub classes."""
return Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True)
mean = Column(DOUBLE_TYPE)
min = Column(DOUBLE_TYPE)
max = Column(DOUBLE_TYPE)
last_reset = Column(DATETIME_TYPE)
state = Column(DOUBLE_TYPE)
sum = Column(DOUBLE_TYPE)
@classmethod
def from_stats(cls, metadata_id: int, stats: StatisticData) -> StatisticsBase:
"""Create object from a statistics."""
return cls( # type: ignore[call-arg,misc]
metadata_id=metadata_id,
**stats,
)
class Statistics(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Long term statistics."""
duration = timedelta(hours=1)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index("ix_statistics_statistic_id_start", "metadata_id", "start", unique=True),
)
__tablename__ = TABLE_STATISTICS
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Short term statistics."""
duration = timedelta(minutes=5)
__table_args__ = (
# Used for fetching statistics for a certain entity at a specific time
Index(
"ix_statistics_short_term_statistic_id_start",
"metadata_id",
"start",
unique=True,
),
)
__tablename__ = TABLE_STATISTICS_SHORT_TERM
class StatisticMetaData(TypedDict):
"""Statistic meta data class."""
@ -527,131 +68,6 @@ class StatisticMetaData(TypedDict):
unit_of_measurement: str | None
class StatisticsMeta(Base): # type: ignore[misc,valid-type]
"""Statistics meta data."""
__table_args__ = (
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_STATISTICS_META
id = Column(Integer, Identity(), primary_key=True)
statistic_id = Column(String(255), index=True, unique=True)
source = Column(String(32))
unit_of_measurement = Column(String(255))
has_mean = Column(Boolean)
has_sum = Column(Boolean)
name = Column(String(255))
@staticmethod
def from_meta(meta: StatisticMetaData) -> StatisticsMeta:
"""Create object from meta data."""
return StatisticsMeta(**meta)
class RecorderRuns(Base): # type: ignore[misc,valid-type]
"""Representation of recorder run."""
__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)
__tablename__ = TABLE_RECORDER_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DateTime(timezone=True), default=dt_util.utcnow)
end = Column(DateTime(timezone=True))
closed_incorrect = Column(Boolean, default=False)
created = Column(DateTime(timezone=True), default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
end = (
f"'{self.end.isoformat(sep=' ', timespec='seconds')}'" if self.end else None
)
return (
f"<recorder.RecorderRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep=' ', timespec='seconds')}', "
f"end={end}, closed_incorrect={self.closed_incorrect}, "
f"created='{self.created.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)
def entity_ids(self, point_in_time: datetime | None = None) -> list[str]:
"""Return the entity ids that existed in this run.
Specify point_in_time if you want to know which existed at that point
in time inside the run.
"""
session = Session.object_session(self)
assert session is not None, "RecorderRuns need to be persisted"
query = session.query(distinct(States.entity_id)).filter(
States.last_updated >= self.start
)
if point_in_time is not None:
query = query.filter(States.last_updated < point_in_time)
elif self.end is not None:
query = query.filter(States.last_updated < self.end)
return [row[0] for row in query]
def to_native(self, validate_entity_id: bool = True) -> RecorderRuns:
"""Return self, native format is this model."""
return self
class SchemaChanges(Base): # type: ignore[misc,valid-type]
"""Representation of schema version changes."""
__tablename__ = TABLE_SCHEMA_CHANGES
change_id = Column(Integer, Identity(), primary_key=True)
schema_version = Column(Integer)
changed = Column(DateTime(timezone=True), default=dt_util.utcnow)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.SchemaChanges("
f"id={self.change_id}, schema_version={self.schema_version}, "
f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)
class StatisticsRuns(Base): # type: ignore[misc,valid-type]
"""Representation of statistics run."""
__tablename__ = TABLE_STATISTICS_RUNS
run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DateTime(timezone=True), index=True)
def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.StatisticsRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep=' ', timespec='seconds')}', "
f")>"
)
EVENT_DATA_JSON = type_coerce(
EventData.shared_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True)
)
OLD_FORMAT_EVENT_DATA_JSON = type_coerce(
Events.event_data.cast(JSONB_VARIENT_CAST), JSONLiteral(none_as_null=True)
)
SHARED_ATTRS_JSON = type_coerce(
StateAttributes.shared_attrs.cast(JSON_VARIENT_CAST), JSON(none_as_null=True)
)
OLD_FORMAT_ATTRS_JSON = type_coerce(
States.attributes.cast(JSON_VARIENT_CAST), JSON(none_as_null=True)
)
ENTITY_ID_IN_EVENT: Column = EVENT_DATA_JSON["entity_id"]
OLD_ENTITY_ID_IN_EVENT: Column = OLD_FORMAT_EVENT_DATA_JSON["entity_id"]
DEVICE_ID_IN_EVENT: Column = EVENT_DATA_JSON["device_id"]
OLD_STATE = aliased(States, name="old_state")
@overload
def process_timestamp(ts: None) -> None:
...

View File

@ -14,7 +14,7 @@ from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
from .const import MAX_ROWS_TO_PURGE, SupportedDialect
from .models import Events, StateAttributes, States
from .db_schema import Events, StateAttributes, States
from .queries import (
attributes_ids_exist_in_states,
attributes_ids_exist_in_states_sqlite,

View File

@ -9,7 +9,7 @@ from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select
from .const import MAX_ROWS_TO_PURGE
from .models import (
from .db_schema import (
EventData,
Events,
RecorderRuns,

View File

@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
from sqlalchemy import text
from .const import SupportedDialect
from .models import ALL_TABLES
from .db_schema import ALL_TABLES
if TYPE_CHECKING:
from . import Recorder

View File

@ -9,7 +9,8 @@ from sqlalchemy.orm.session import Session
import homeassistant.util.dt as dt_util
from .models import RecorderRuns, process_timestamp
from .db_schema import RecorderRuns
from .models import process_timestamp
def _find_recorder_run_for_start_time(

View File

@ -42,14 +42,11 @@ from homeassistant.util.unit_system import UnitSystem
import homeassistant.util.volume as volume_util
from .const import DATA_INSTANCE, DOMAIN, MAX_ROWS_TO_PURGE, SupportedDialect
from .db_schema import Statistics, StatisticsMeta, StatisticsRuns, StatisticsShortTerm
from .models import (
StatisticData,
StatisticMetaData,
StatisticResult,
Statistics,
StatisticsMeta,
StatisticsRuns,
StatisticsShortTerm,
process_timestamp,
process_timestamp_to_utc_isoformat,
)

View File

@ -29,14 +29,13 @@ from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX, SupportedDialect
from .models import (
from .db_schema import (
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLES_TO_CHECK,
RecorderRuns,
UnsupportedDialect,
process_timestamp,
)
from .models import UnsupportedDialect, process_timestamp
if TYPE_CHECKING:
from . import Recorder

View File

@ -11,7 +11,7 @@ from homeassistant.components.automation import (
ATTR_MODE,
CONF_ID,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components import camera
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
ATTR_ATTRIBUTION,

View File

@ -15,7 +15,7 @@ from homeassistant.components.climate.const import (
ATTR_SWING_MODES,
ATTR_TARGET_TEMP_STEP,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from homeassistant.components import fan
from homeassistant.components.fan import ATTR_PRESET_MODES
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from homeassistant.components import group
from homeassistant.components.group import ATTR_AUTO, ATTR_ENTITY_ID, ATTR_ORDER
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME, STATE_ON
from homeassistant.core import State

View File

@ -9,7 +9,7 @@ from homeassistant.components.humidifier import (
ATTR_MAX_HUMIDITY,
ATTR_MIN_HUMIDITY,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.input_boolean import DOMAIN
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.input_button import DOMAIN
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.input_datetime import CONF_HAS_DATE, CONF_HAS_TIME, DOMAIN
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -10,7 +10,7 @@ from homeassistant.components.input_number import (
ATTR_STEP,
DOMAIN,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.input_select import ATTR_OPTIONS, DOMAIN
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -11,7 +11,7 @@ from homeassistant.components.input_text import (
DOMAIN,
MODE_TEXT,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_EDITABLE
from homeassistant.core import HomeAssistant, State

View File

@ -10,7 +10,7 @@ from homeassistant.components.light import (
ATTR_MIN_MIREDS,
ATTR_SUPPORTED_COLOR_MODES,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -11,7 +11,7 @@ from homeassistant.components.media_player.const import (
ATTR_MEDIA_POSITION_UPDATED_AT,
ATTR_SOUND_MODE_LIST,
)
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from homeassistant.components import number
from homeassistant.components.number import ATTR_MAX, ATTR_MIN, ATTR_MODE, ATTR_STEP
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_FRIENDLY_NAME
from homeassistant.core import State

View File

@ -14,13 +14,13 @@ from homeassistant import core as ha
from homeassistant.components import recorder
from homeassistant.components.recorder import get_instance, statistics
from homeassistant.components.recorder.core import Recorder
from homeassistant.components.recorder.models import RecorderRuns
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder.tasks import RecorderTask, StatisticsTask
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from tests.common import async_fire_time_changed, fire_time_changed
from tests.components.recorder import models_schema_0
from tests.components.recorder import db_schema_0
DEFAULT_PURGE_TASKS = 3
@ -122,7 +122,7 @@ def create_engine_test(*args, **kwargs):
This simulates an existing db with the old schema.
"""
engine = create_engine(*args, **kwargs)
models_schema_0.Base.metadata.create_all(engine)
db_schema_0.Base.metadata.create_all(engine)
return engine

View File

@ -5,12 +5,12 @@ from sqlalchemy import select
from sqlalchemy.engine.row import Row
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.db_schema import EventData, States
from homeassistant.components.recorder.filters import (
Filters,
extract_include_exclude_filter_conf,
sqlalchemy_filter_from_include_exclude_conf,
)
from homeassistant.components.recorder.models import EventData, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import ATTR_ENTITY_ID, STATE_ON
from homeassistant.core import HomeAssistant

View File

@ -12,14 +12,13 @@ from sqlalchemy import text
from homeassistant.components import recorder
from homeassistant.components.recorder import history
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
Events,
LazyState,
RecorderRuns,
StateAttributes,
States,
process_timestamp,
)
from homeassistant.components.recorder.models import LazyState, process_timestamp
from homeassistant.components.recorder.util import session_scope
import homeassistant.core as ha
from homeassistant.core import HomeAssistant, State

View File

@ -25,7 +25,7 @@ from homeassistant.components.recorder import (
get_instance,
)
from homeassistant.components.recorder.const import DATA_INSTANCE, KEEPALIVE_TIME
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION,
EventData,
Events,
@ -33,8 +33,8 @@ from homeassistant.components.recorder.models import (
StateAttributes,
States,
StatisticsRuns,
process_timestamp,
)
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.services import (
SERVICE_DISABLE,
SERVICE_ENABLE,

View File

@ -20,9 +20,9 @@ from sqlalchemy.pool import StaticPool
from homeassistant.bootstrap import async_setup_component
from homeassistant.components import persistent_notification as pn, recorder
from homeassistant.components.recorder import migration, models
from homeassistant.components.recorder import db_schema, migration
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION,
RecorderRuns,
States,
@ -66,7 +66,7 @@ async def test_schema_update_calls(hass):
update.assert_has_calls(
[
call(hass, engine, session_maker, version + 1, 0)
for version in range(0, models.SCHEMA_VERSION)
for version in range(0, db_schema.SCHEMA_VERSION)
]
)
@ -267,14 +267,16 @@ async def test_schema_migrate(hass, start_version):
This simulates an existing db with the old schema.
"""
module = f"tests.components.recorder.models_schema_{str(start_version)}"
module = f"tests.components.recorder.db_schema_{str(start_version)}"
importlib.import_module(module)
old_models = sys.modules[module]
engine = create_engine(*args, **kwargs)
old_models.Base.metadata.create_all(engine)
if start_version > 0:
with Session(engine) as session:
session.add(recorder.models.SchemaChanges(schema_version=start_version))
session.add(
recorder.db_schema.SchemaChanges(schema_version=start_version)
)
session.commit()
return engine
@ -299,8 +301,8 @@ async def test_schema_migrate(hass, start_version):
# the recorder will silently create a new database.
with session_scope(hass=hass) as session:
res = (
session.query(models.SchemaChanges)
.order_by(models.SchemaChanges.change_id.desc())
session.query(db_schema.SchemaChanges)
.order_by(db_schema.SchemaChanges.change_id.desc())
.first()
)
migration_version = res.schema_version
@ -325,7 +327,7 @@ async def test_schema_migrate(hass, start_version):
await hass.async_block_till_done()
await hass.async_add_executor_job(migration_done.wait)
await async_wait_recording_done(hass)
assert migration_version == models.SCHEMA_VERSION
assert migration_version == db_schema.SCHEMA_VERSION
assert setup_run.called
assert recorder.util.async_migration_in_progress(hass) is not True
@ -381,7 +383,7 @@ def test_forgiving_add_column():
def test_forgiving_add_index():
"""Test that add index will continue if index exists."""
engine = create_engine("sqlite://", poolclass=StaticPool)
models.Base.metadata.create_all(engine)
db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
instance = Mock()
instance.get_session = Mock(return_value=session)

View File

@ -7,14 +7,16 @@ import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
Base,
EventData,
Events,
LazyState,
RecorderRuns,
StateAttributes,
States,
)
from homeassistant.components.recorder.models import (
LazyState,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,

View File

@ -10,7 +10,7 @@ from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect
from homeassistant.components.recorder.models import (
from homeassistant.components.recorder.db_schema import (
EventData,
Events,
RecorderRuns,

View File

@ -3,7 +3,8 @@
from datetime import timedelta
from homeassistant.components import recorder
from homeassistant.components.recorder.models import RecorderRuns, process_timestamp
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.util import dt as dt_util

View File

@ -13,10 +13,8 @@ from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import history, statistics
from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX
from homeassistant.components.recorder.models import (
StatisticsShortTerm,
process_timestamp_to_utc_isoformat,
)
from homeassistant.components.recorder.db_schema import StatisticsShortTerm
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
delete_statistics_duplicates,
@ -390,7 +388,7 @@ def test_rename_entity_collision(hass_recorder, caplog):
}
with session_scope(hass=hass) as session:
session.add(recorder.models.StatisticsMeta.from_meta(metadata_1))
session.add(recorder.db_schema.StatisticsMeta.from_meta(metadata_1))
# Rename entity sensor.test1 to sensor.test99
@callback
@ -941,7 +939,7 @@ def test_duplicate_statistics_handle_integrity_error(hass_recorder, caplog):
assert insert_statistics_mock.call_count == 3
with session_scope(hass=hass) as session:
tmp = session.query(recorder.models.Statistics).all()
tmp = session.query(recorder.db_schema.Statistics).all()
assert len(tmp) == 2
assert "Blocked attempt to insert duplicated statistic rows" in caplog.text
@ -952,15 +950,19 @@ def _create_engine_28(*args, **kwargs):
This simulates an existing db with the old schema.
"""
module = "tests.components.recorder.models_schema_28"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_models = sys.modules[module]
old_db_schema = sys.modules[module]
engine = create_engine(*args, **kwargs)
old_models.Base.metadata.create_all(engine)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(recorder.models.StatisticsRuns(start=statistics.get_start_time()))
session.add(
recorder.models.SchemaChanges(schema_version=old_models.SCHEMA_VERSION)
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@ -971,9 +973,9 @@ def test_delete_metadata_duplicates(caplog, tmpdir):
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.models_schema_28"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_models = sys.modules[module]
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
@ -1001,8 +1003,8 @@ def test_delete_metadata_duplicates(caplog, tmpdir):
}
# Create some duplicated statistics_meta with schema version 28
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
@ -1013,15 +1015,17 @@ def test_delete_metadata_duplicates(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata))
with session_scope(hass=hass) as session:
tmp = session.query(recorder.models.StatisticsMeta).all()
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 1
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
@ -1042,7 +1046,7 @@ def test_delete_metadata_duplicates(caplog, tmpdir):
assert "Deleted 1 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.models.StatisticsMeta).all()
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 2
assert tmp[0].id == 2
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
@ -1058,9 +1062,9 @@ def test_delete_metadata_duplicates_many(caplog, tmpdir):
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.models_schema_28"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_models = sys.modules[module]
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
@ -1088,8 +1092,8 @@ def test_delete_metadata_duplicates_many(caplog, tmpdir):
}
# Create some duplicated statistics with schema version 28
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
@ -1100,20 +1104,26 @@ def test_delete_metadata_duplicates_many(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
for _ in range(3000):
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(
external_energy_metadata_1
)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata))
session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata))
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
@ -1127,7 +1137,7 @@ def test_delete_metadata_duplicates_many(caplog, tmpdir):
assert "Deleted 3002 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.models.StatisticsMeta).all()
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 3001
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"

View File

@ -25,7 +25,7 @@ from tests.components.recorder.common import wait_recording_done
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.models_schema_23_with_newer_columns"
SCHEMA_MODULE = "tests.components.recorder.db_schema_23_with_newer_columns"
def _create_engine_test(*args, **kwargs):
@ -34,13 +34,17 @@ def _create_engine_test(*args, **kwargs):
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_models = sys.modules[SCHEMA_MODULE]
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_models.Base.metadata.create_all(engine)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(recorder.models.StatisticsRuns(start=statistics.get_start_time()))
session.add(
recorder.models.SchemaChanges(schema_version=old_models.SCHEMA_VERSION)
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@ -52,7 +56,7 @@ def test_delete_duplicates(caplog, tmpdir):
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
old_models = sys.modules[SCHEMA_MODULE]
old_db_schema = sys.modules[SCHEMA_MODULE]
period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00"))
@ -171,8 +175,8 @@ def test_delete_duplicates(caplog, tmpdir):
}
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
hass = get_test_home_assistant()
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -181,19 +185,21 @@ def test_delete_duplicates(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata))
with session_scope(hass=hass) as session:
for stat in external_energy_statistics_1:
session.add(recorder.models.Statistics.from_stats(1, stat))
session.add(recorder.db_schema.Statistics.from_stats(1, stat))
for stat in external_energy_statistics_2:
session.add(recorder.models.Statistics.from_stats(2, stat))
session.add(recorder.db_schema.Statistics.from_stats(2, stat))
for stat in external_co2_statistics:
session.add(recorder.models.Statistics.from_stats(3, stat))
session.add(recorder.db_schema.Statistics.from_stats(3, stat))
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
@ -218,7 +224,7 @@ def test_delete_duplicates_many(caplog, tmpdir):
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
old_models = sys.modules[SCHEMA_MODULE]
old_db_schema = sys.modules[SCHEMA_MODULE]
period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00"))
@ -337,8 +343,8 @@ def test_delete_duplicates_many(caplog, tmpdir):
}
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
hass = get_test_home_assistant()
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -347,25 +353,27 @@ def test_delete_duplicates_many(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata))
with session_scope(hass=hass) as session:
for stat in external_energy_statistics_1:
session.add(recorder.models.Statistics.from_stats(1, stat))
session.add(recorder.db_schema.Statistics.from_stats(1, stat))
for _ in range(3000):
session.add(
recorder.models.Statistics.from_stats(
recorder.db_schema.Statistics.from_stats(
1, external_energy_statistics_1[-1]
)
)
for stat in external_energy_statistics_2:
session.add(recorder.models.Statistics.from_stats(2, stat))
session.add(recorder.db_schema.Statistics.from_stats(2, stat))
for stat in external_co2_statistics:
session.add(recorder.models.Statistics.from_stats(3, stat))
session.add(recorder.db_schema.Statistics.from_stats(3, stat))
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
@ -391,7 +399,7 @@ def test_delete_duplicates_non_identical(caplog, tmpdir):
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
old_models = sys.modules[SCHEMA_MODULE]
old_db_schema = sys.modules[SCHEMA_MODULE]
period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00"))
@ -480,8 +488,8 @@ def test_delete_duplicates_non_identical(caplog, tmpdir):
}
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
hass = get_test_home_assistant()
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -490,16 +498,16 @@ def test_delete_duplicates_non_identical(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
with session_scope(hass=hass) as session:
for stat in external_energy_statistics_1:
session.add(recorder.models.Statistics.from_stats(1, stat))
session.add(recorder.db_schema.Statistics.from_stats(1, stat))
for stat in external_energy_statistics_2:
session.add(recorder.models.Statistics.from_stats(2, stat))
session.add(recorder.db_schema.Statistics.from_stats(2, stat))
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
@ -560,7 +568,7 @@ def test_delete_duplicates_short_term(caplog, tmpdir):
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
old_models = sys.modules[SCHEMA_MODULE]
old_db_schema = sys.modules[SCHEMA_MODULE]
period4 = dt_util.as_utc(dt_util.parse_datetime("2021-10-31 23:00:00"))
@ -580,8 +588,8 @@ def test_delete_duplicates_short_term(caplog, tmpdir):
}
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "models", old_models), patch.object(
recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
hass = get_test_home_assistant()
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -590,14 +598,14 @@ def test_delete_duplicates_short_term(caplog, tmpdir):
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1)
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
with session_scope(hass=hass) as session:
session.add(
recorder.models.StatisticsShortTerm.from_stats(1, statistic_row)
recorder.db_schema.StatisticsShortTerm.from_stats(1, statistic_row)
)
session.add(
recorder.models.StatisticsShortTerm.from_stats(1, statistic_row)
recorder.db_schema.StatisticsShortTerm.from_stats(1, statistic_row)
)
hass.stop()

View File

@ -14,7 +14,8 @@ from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components import recorder
from homeassistant.components.recorder import history, util
from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX
from homeassistant.components.recorder.models import RecorderRuns, UnsupportedDialect
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder.models import UnsupportedDialect
from homeassistant.components.recorder.util import (
end_incomplete_runs,
is_second_sunday,

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import pytest
from homeassistant.components import script
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.script import (
ATTR_CUR,

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components import select
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.select import ATTR_OPTIONS
from homeassistant.const import ATTR_FRIENDLY_NAME

View File

@ -11,10 +11,8 @@ from pytest import approx
from homeassistant import loader
from homeassistant.components.recorder import history
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import (
StatisticsMeta,
process_timestamp_to_utc_isoformat,
)
from homeassistant.components.recorder.db_schema import StatisticsMeta
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
from homeassistant.components.recorder.statistics import (
get_metadata,
list_statistic_ids,
@ -2287,7 +2285,7 @@ def test_compile_statistics_hourly_daily_monthly_summary(hass_recorder, caplog):
year=2021, month=9, day=1, hour=5, minute=0, second=0, microsecond=0
)
with patch(
"homeassistant.components.recorder.models.dt_util.utcnow", return_value=zero
"homeassistant.components.recorder.db_schema.dt_util.utcnow", return_value=zero
):
hass = hass_recorder()
# Remove this after dropping the use of the hass_recorder fixture

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components import siren
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.siren import ATTR_AVAILABLE_TONES
from homeassistant.const import ATTR_FRIENDLY_NAME

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.sun import (
DOMAIN,

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.update.const import (
ATTR_IN_PROGRESS,

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components import vacuum
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.vacuum import ATTR_FAN_SPEED_LIST
from homeassistant.const import ATTR_FRIENDLY_NAME

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components import water_heater
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.water_heater import (
ATTR_MAX_TEMP,

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from datetime import timedelta
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.db_schema import StateAttributes, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.weather import ATTR_FORECAST, DOMAIN
from homeassistant.core import HomeAssistant, State