Add type ignore error codes [recorder] (#66780)

This commit is contained in:
Marc Mueller 2022-02-18 10:37:38 +01:00 committed by GitHub
parent c8ae0d3bbe
commit cb736eaeaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 26 deletions

View File

@ -77,7 +77,7 @@ DOUBLE_TYPE = (
)
class Events(Base): # type: ignore
class Events(Base): # type: ignore[misc,valid-type]
"""Event history data."""
__table_args__ = (
@ -141,7 +141,7 @@ class Events(Base): # type: ignore
return None
class States(Base): # type: ignore
class States(Base): # type: ignore[misc,valid-type]
"""State change history."""
__table_args__ = (
@ -276,13 +276,13 @@ class StatisticsBase:
@classmethod
def from_stats(cls, metadata_id: int, stats: StatisticData):
"""Create object from a statistics."""
return cls( # type: ignore
return cls( # type: ignore[call-arg,misc]
metadata_id=metadata_id,
**stats,
)
class Statistics(Base, StatisticsBase): # type: ignore
class Statistics(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Long term statistics."""
duration = timedelta(hours=1)
@ -294,7 +294,7 @@ class Statistics(Base, StatisticsBase): # type: ignore
__tablename__ = TABLE_STATISTICS
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore
class StatisticsShortTerm(Base, StatisticsBase): # type: ignore[misc,valid-type]
"""Short term statistics."""
duration = timedelta(minutes=5)
@ -322,7 +322,7 @@ class StatisticMetaData(TypedDict):
unit_of_measurement: str | None
class StatisticsMeta(Base): # type: ignore
class StatisticsMeta(Base): # type: ignore[misc,valid-type]
"""Statistics meta data."""
__table_args__ = (
@ -343,7 +343,7 @@ class StatisticsMeta(Base): # type: ignore
return StatisticsMeta(**meta)
class RecorderRuns(Base): # type: ignore
class RecorderRuns(Base): # type: ignore[misc,valid-type]
"""Representation of recorder run."""
__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)
@ -393,7 +393,7 @@ class RecorderRuns(Base): # type: ignore
return self
class SchemaChanges(Base): # type: ignore
class SchemaChanges(Base): # type: ignore[misc,valid-type]
"""Representation of schema version changes."""
__tablename__ = TABLE_SCHEMA_CHANGES
@ -411,7 +411,7 @@ class SchemaChanges(Base): # type: ignore
)
class StatisticsRuns(Base): # type: ignore
class StatisticsRuns(Base): # type: ignore[misc,valid-type]
"""Representation of statistics run."""
__tablename__ = TABLE_STATISTICS_RUNS
@ -491,7 +491,7 @@ class LazyState(State):
self._last_updated = None
self._context = None
@property # type: ignore
@property # type: ignore[override]
def attributes(self):
"""State attributes."""
if not self._attributes:
@ -508,7 +508,7 @@ class LazyState(State):
"""Set attributes."""
self._attributes = value
@property # type: ignore
@property # type: ignore[override]
def context(self):
"""State context."""
if not self._context:
@ -520,7 +520,7 @@ class LazyState(State):
"""Set context."""
self._context = value
@property # type: ignore
@property # type: ignore[override]
def last_changed(self):
"""Last changed datetime."""
if not self._last_changed:
@ -532,7 +532,7 @@ class LazyState(State):
"""Set last changed datetime."""
self._last_changed = value
@property # type: ignore
@property # type: ignore[override]
def last_updated(self):
"""Last updated datetime."""
if not self._last_updated:

View File

@ -34,7 +34,7 @@ def purge_old_data(
purge_before.isoformat(sep=" ", timespec="seconds"),
)
with session_scope(session=instance.get_session()) as session: # type: ignore
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
@ -267,7 +267,7 @@ def _purge_filtered_states(
"Selected %s state_ids to remove that should be filtered", len(state_ids)
)
_purge_state_ids(instance, session, set(state_ids))
_purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]'
_purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]'
def _purge_filtered_events(
@ -295,7 +295,7 @@ def _purge_filtered_events(
@retryable_database_job("purge")
def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool:
"""Purge states and events of specified entities."""
with session_scope(session=instance.get_session()) as session: # type: ignore
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
selected_entity_ids: list[str] = [
entity_id
for (entity_id,) in session.query(distinct(States.entity_id)).all()

View File

@ -488,7 +488,7 @@ def compile_hourly_statistics(
)
if stats:
for metadata_id, group in groupby(stats, lambda stat: stat["metadata_id"]): # type: ignore
for metadata_id, group in groupby(stats, lambda stat: stat["metadata_id"]): # type: ignore[no-any-return]
(
metadata_id,
last_reset,
@ -527,7 +527,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
end = start + timedelta(minutes=5)
# Return if we already have 5-minute statistics for the requested period
with session_scope(session=instance.get_session()) as session: # type: ignore
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
if session.query(StatisticsRuns).filter_by(start=start).first():
_LOGGER.debug("Statistics already compiled for %s-%s", start, end)
return True
@ -546,7 +546,7 @@ def compile_statistics(instance: Recorder, start: datetime) -> bool:
# Insert collected statistics in the database
with session_scope(
session=instance.get_session(), # type: ignore
session=instance.get_session(), # type: ignore[misc]
exception_filter=_filter_unique_constraint_integrity_error(instance),
) as session:
for stats in platform_stats:
@ -700,7 +700,7 @@ def _configured_unit(unit: str, units: UnitSystem) -> str:
def clear_statistics(instance: Recorder, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids."""
with session_scope(session=instance.get_session()) as session: # type: ignore
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
session.query(StatisticsMeta).filter(
StatisticsMeta.statistic_id.in_(statistic_ids)
).delete(synchronize_session=False)
@ -710,7 +710,7 @@ def update_statistics_metadata(
instance: Recorder, statistic_id: str, unit_of_measurement: str | None
) -> None:
"""Update statistics metadata for a statistic_id."""
with session_scope(session=instance.get_session()) as session: # type: ignore
with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
session.query(StatisticsMeta).filter(
StatisticsMeta.statistic_id == statistic_id
).update({StatisticsMeta.unit_of_measurement: unit_of_measurement})
@ -1093,7 +1093,7 @@ def _sorted_statistics_to_dict(
def no_conversion(val: Any, _: Any) -> float | None:
"""Return x."""
return val # type: ignore
return val # type: ignore[no-any-return]
# Set all statistic IDs to empty lists in result set to maintain the order
if statistic_ids is not None:
@ -1101,7 +1101,7 @@ def _sorted_statistics_to_dict(
result[stat_id] = []
# Identify metadata IDs for which no data was available at the requested start time
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore[no-any-return]
first_start_time = process_timestamp(next(group).start)
if start_time and first_start_time > start_time:
need_stat_at_start_time.add(meta_id)
@ -1115,12 +1115,12 @@ def _sorted_statistics_to_dict(
stats_at_start_time[stat.metadata_id] = (stat,)
# Append all statistic entries, and optionally do unit conversion
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore
for meta_id, group in groupby(stats, lambda stat: stat.metadata_id): # type: ignore[no-any-return]
unit = metadata[meta_id]["unit_of_measurement"]
statistic_id = metadata[meta_id]["statistic_id"]
convert: Callable[[Any, Any], float | None]
if convert_units:
convert = UNIT_CONVERSIONS.get(unit, lambda x, units: x) # type: ignore
convert = UNIT_CONVERSIONS.get(unit, lambda x, units: x) # type: ignore[arg-type,no-any-return]
else:
convert = no_conversion
ent_results = result[meta_id]
@ -1249,7 +1249,7 @@ def add_external_statistics(
"""Process an add_statistics job."""
with session_scope(
session=instance.get_session(), # type: ignore
session=instance.get_session(), # type: ignore[misc]
exception_filter=_filter_unique_constraint_integrity_error(instance),
) as session:
metadata_id = _update_or_add_metadata(instance.hass, session, metadata)