Type recorder retry decorator (#70993)

This commit is contained in:
Marc Mueller 2022-04-28 21:04:41 +02:00 committed by GitHub
parent 7fefd4bc67
commit cf90e34776
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 7 deletions

View File

@ -83,6 +83,8 @@ from .models import (
Events, Events,
StateAttributes, StateAttributes,
States, States,
StatisticData,
StatisticMetaData,
StatisticsRuns, StatisticsRuns,
process_timestamp, process_timestamp,
) )
@ -460,8 +462,8 @@ class StatisticsTask(RecorderTask):
class ExternalStatisticsTask(RecorderTask): class ExternalStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an external statistics task.""" """An object to insert into the recorder queue to run an external statistics task."""
metadata: dict metadata: StatisticMetaData
statistics: Iterable[dict] statistics: Iterable[StatisticData]
def run(self, instance: Recorder) -> None: def run(self, instance: Recorder) -> None:
"""Run statistics task.""" """Run statistics task."""
@ -916,7 +918,9 @@ class Recorder(threading.Thread):
self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement)) self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))
@callback @callback
def async_external_statistics(self, metadata: dict, stats: Iterable[dict]) -> None: def async_external_statistics(
self, metadata: StatisticMetaData, stats: Iterable[StatisticData]
) -> None:
"""Schedule external statistics.""" """Schedule external statistics."""
self.queue.put(ExternalStatisticsTask(metadata, stats)) self.queue.put(ExternalStatisticsTask(metadata, stats))

View File

@ -8,7 +8,7 @@ import functools
import logging import logging
import os import os
import time import time
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any, TypeVar
from awesomeversion import ( from awesomeversion import (
AwesomeVersion, AwesomeVersion,
@ -20,6 +20,7 @@ from sqlalchemy.engine.cursor import CursorFetchStrategy
from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.query import Query from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from typing_extensions import Concatenate, ParamSpec
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -40,6 +41,9 @@ from .models import (
if TYPE_CHECKING: if TYPE_CHECKING:
from . import Recorder from . import Recorder
_RecorderT = TypeVar("_RecorderT", bound="Recorder")
_P = ParamSpec("_P")
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
RETRIES = 3 RETRIES = 3
@ -430,15 +434,22 @@ def end_incomplete_runs(session: Session, start_time: datetime) -> None:
session.add(run) session.add(run)
def retryable_database_job(description: str) -> Callable: def retryable_database_job(
description: str,
) -> Callable[
[Callable[Concatenate[_RecorderT, _P], bool]],
Callable[Concatenate[_RecorderT, _P], bool],
]:
"""Try to execute a database job. """Try to execute a database job.
The job should return True if it finished, and False if it needs to be rescheduled. The job should return True if it finished, and False if it needs to be rescheduled.
""" """
def decorator(job: Callable[[Any], bool]) -> Callable: def decorator(
job: Callable[Concatenate[_RecorderT, _P], bool]
) -> Callable[Concatenate[_RecorderT, _P], bool]:
@functools.wraps(job) @functools.wraps(job)
def wrapper(instance: Recorder, *args: Any, **kwargs: Any) -> bool: def wrapper(instance: _RecorderT, *args: _P.args, **kwargs: _P.kwargs) -> bool:
try: try:
return job(instance, *args, **kwargs) return job(instance, *args, **kwargs)
except OperationalError as err: except OperationalError as err: