Reduce size of migration transactions to accommodate slow/busy systems (#92312)

* Reduce size of migration transactions to accommodate slow/busy systems

related issue #91489

* handle overloaded RPIs better
This commit is contained in:
J. Nick Koston 2023-04-30 22:18:00 -05:00 committed by GitHub
parent 30dd8b9f3a
commit ede1f08c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 12 deletions

View File

@ -1158,23 +1158,23 @@ def _wipe_old_string_time_columns(
elif engine.dialect.name == SupportedDialect.MYSQL: elif engine.dialect.name == SupportedDialect.MYSQL:
# #
# Since this is only to save space we limit the number of rows we update # Since this is only to save space we limit the number of rows we update
# to 10,000,000 per table since we do not want to block the database for too long # to 100,000 per table since we do not want to block the database for too long
# or run out of innodb_buffer_pool_size on MySQL. The old data will eventually # or run out of innodb_buffer_pool_size on MySQL. The old data will eventually
# be cleaned up by the recorder purge if we do not do it now. # be cleaned up by the recorder purge if we do not do it now.
# #
session.execute(text("UPDATE events set time_fired=NULL LIMIT 10000000;")) session.execute(text("UPDATE events set time_fired=NULL LIMIT 100000;"))
session.commit() session.commit()
session.execute( session.execute(
text( text(
"UPDATE states set last_updated=NULL, last_changed=NULL " "UPDATE states set last_updated=NULL, last_changed=NULL "
" LIMIT 10000000;" " LIMIT 100000;"
) )
) )
session.commit() session.commit()
elif engine.dialect.name == SupportedDialect.POSTGRESQL: elif engine.dialect.name == SupportedDialect.POSTGRESQL:
# #
# Since this is only to save space we limit the number of rows we update # Since this is only to save space we limit the number of rows we update
# to 250,000 per table since we do not want to block the database for too long # to 100,000 per table since we do not want to block the database for too long
# or run out ram with postgresql. The old data will eventually # or run out ram with postgresql. The old data will eventually
# be cleaned up by the recorder purge if we do not do it now. # be cleaned up by the recorder purge if we do not do it now.
# #
@ -1182,7 +1182,7 @@ def _wipe_old_string_time_columns(
text( text(
"UPDATE events set time_fired=NULL " "UPDATE events set time_fired=NULL "
"where event_id in " "where event_id in "
"(select event_id from events where time_fired_ts is NOT NULL LIMIT 250000);" "(select event_id from events where time_fired_ts is NOT NULL LIMIT 100000);"
) )
) )
session.commit() session.commit()
@ -1190,7 +1190,7 @@ def _wipe_old_string_time_columns(
text( text(
"UPDATE states set last_updated=NULL, last_changed=NULL " "UPDATE states set last_updated=NULL, last_changed=NULL "
"where state_id in " "where state_id in "
"(select state_id from states where last_updated_ts is NOT NULL LIMIT 250000);" "(select state_id from states where last_updated_ts is NOT NULL LIMIT 100000);"
) )
) )
session.commit() session.commit()
@ -1236,7 +1236,7 @@ def _migrate_columns_to_timestamp(
"UNIX_TIMESTAMP(time_fired)" "UNIX_TIMESTAMP(time_fired)"
") " ") "
"where time_fired_ts is NULL " "where time_fired_ts is NULL "
"LIMIT 250000;" "LIMIT 100000;"
) )
) )
result = None result = None
@ -1251,7 +1251,7 @@ def _migrate_columns_to_timestamp(
"last_changed_ts=" "last_changed_ts="
"UNIX_TIMESTAMP(last_changed) " "UNIX_TIMESTAMP(last_changed) "
"where last_updated_ts is NULL " "where last_updated_ts is NULL "
"LIMIT 250000;" "LIMIT 100000;"
) )
) )
elif engine.dialect.name == SupportedDialect.POSTGRESQL: elif engine.dialect.name == SupportedDialect.POSTGRESQL:
@ -1266,7 +1266,7 @@ def _migrate_columns_to_timestamp(
"time_fired_ts= " "time_fired_ts= "
"(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired::timestamptz) end) " "(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired::timestamptz) end) "
"WHERE event_id IN ( " "WHERE event_id IN ( "
"SELECT event_id FROM events where time_fired_ts is NULL LIMIT 250000 " "SELECT event_id FROM events where time_fired_ts is NULL LIMIT 100000 "
" );" " );"
) )
) )
@ -1279,7 +1279,7 @@ def _migrate_columns_to_timestamp(
"(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated::timestamptz) end), " "(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated::timestamptz) end), "
"last_changed_ts=EXTRACT(EPOCH FROM last_changed::timestamptz) " "last_changed_ts=EXTRACT(EPOCH FROM last_changed::timestamptz) "
"where state_id IN ( " "where state_id IN ( "
"SELECT state_id FROM states where last_updated_ts is NULL LIMIT 250000 " "SELECT state_id FROM states where last_updated_ts is NULL LIMIT 100000 "
" );" " );"
) )
) )

View File

@ -2333,7 +2333,7 @@ def cleanup_statistics_timestamp_migration(instance: Recorder) -> bool:
session.connection() session.connection()
.execute( .execute(
text( text(
f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL where start is not NULL LIMIT 250000;" f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL where start is not NULL LIMIT 100000;"
) )
) )
.rowcount .rowcount
@ -2349,7 +2349,7 @@ def cleanup_statistics_timestamp_migration(instance: Recorder) -> bool:
.execute( .execute(
text( text(
f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL " # nosec f"UPDATE {table} set start=NULL, created=NULL, last_reset=NULL " # nosec
f"where id in (select id from {table} where start is not NULL LIMIT 250000)" f"where id in (select id from {table} where start is not NULL LIMIT 100000)"
) )
) )
.rowcount .rowcount