Refactor recorder retryable_database_job decorator (#125306)

This commit is contained in:
Erik Montnemery 2024-09-05 13:09:27 +02:00 committed by GitHub
parent 65e16b4814
commit 86ae70780c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 26 deletions

View File

@ -106,6 +106,7 @@ from .util import (
execute_stmt_lambda_element, execute_stmt_lambda_element,
get_index_by_name, get_index_by_name,
retryable_database_job, retryable_database_job,
retryable_database_job_method,
session_scope, session_scope,
) )
@ -2229,7 +2230,7 @@ class BaseRunTimeMigration(ABC):
else: else:
self.migration_done(instance, session) self.migration_done(instance, session)
@retryable_database_job("migrate data", method=True) @retryable_database_job_method("migrate data")
def migrate_data(self, instance: Recorder) -> bool: def migrate_data(self, instance: Recorder) -> bool:
"""Migrate some data, returns True if migration is completed.""" """Migrate some data, returns True if migration is completed."""
status = self.migrate_data_impl(instance) status = self.migrate_data_impl(instance)

View File

@ -645,44 +645,66 @@ def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool:
type _FuncType[**P, R] = Callable[Concatenate[Recorder, P], R] type _FuncType[**P, R] = Callable[Concatenate[Recorder, P], R]
type _MethType[Self, **P, R] = Callable[Concatenate[Self, Recorder, P], R]
type _FuncOrMethType[**_P, _R] = Callable[_P, _R] type _FuncOrMethType[**_P, _R] = Callable[_P, _R]
def retryable_database_job[**_P]( def retryable_database_job[**_P](
description: str, method: bool = False description: str,
) -> Callable[[_FuncOrMethType[_P, bool]], _FuncOrMethType[_P, bool]]: ) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]:
"""Try to execute a database job. """Try to execute a database job.
The job should return True if it finished, and False if it needs to be rescheduled. The job should return True if it finished, and False if it needs to be rescheduled.
""" """
recorder_pos = 1 if method else 0
def decorator(job: _FuncOrMethType[_P, bool]) -> _FuncOrMethType[_P, bool]: def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]:
@functools.wraps(job) return _wrap_func_or_meth(job, description, False)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> bool:
instance: Recorder = args[recorder_pos] # type: ignore[assignment]
try:
return job(*args, **kwargs)
except OperationalError as err:
if _is_retryable_error(instance, err):
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info(
"%s; %s not completed, retrying", err.orig.args[1], description
)
time.sleep(instance.db_retry_wait)
# Failed with retryable error
return False
_LOGGER.warning("Error executing %s: %s", description, err)
# Failed with permanent error
return True
return wrapper
return decorator return decorator
def retryable_database_job_method[_Self, **_P](
description: str,
) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]:
"""Try to execute a database job.
The job should return True if it finished, and False if it needs to be rescheduled.
"""
def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]:
return _wrap_func_or_meth(job, description, True)
return decorator
def _wrap_func_or_meth[**_P](
job: _FuncOrMethType[_P, bool], description: str, method: bool
) -> _FuncOrMethType[_P, bool]:
recorder_pos = 1 if method else 0
@functools.wraps(job)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> bool:
instance: Recorder = args[recorder_pos] # type: ignore[assignment]
try:
return job(*args, **kwargs)
except OperationalError as err:
if _is_retryable_error(instance, err):
assert isinstance(err.orig, BaseException) # noqa: PT017
_LOGGER.info(
"%s; %s not completed, retrying", err.orig.args[1], description
)
time.sleep(instance.db_retry_wait)
# Failed with retryable error
return False
_LOGGER.warning("Error executing %s: %s", description, err)
# Failed with permanent error
return True
return wrapper
def database_job_retry_wrapper[**_P]( def database_job_retry_wrapper[**_P](
description: str, attempts: int = 5 description: str, attempts: int = 5
) -> Callable[[_FuncType[_P, None]], _FuncType[_P, None]]: ) -> Callable[[_FuncType[_P, None]], _FuncType[_P, None]]: