Fix migration when encountering a NULL entity_id/event_type (#90542)

* Fix migration when encountering a NULL entity_id/event_type

reported in #beta on discord

* simplify
This commit is contained in:
J. Nick Koston 2023-03-30 14:54:13 -10:00 committed by Paulus Schoutsen
parent 9478518937
commit e32d89215d
2 changed files with 168 additions and 12 deletions

View File

@ -1439,12 +1439,15 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
if events := session.execute(find_event_type_to_migrate()).all(): if events := session.execute(find_event_type_to_migrate()).all():
event_types = {event_type for _, event_type in events} event_types = {event_type for _, event_type in events}
if None in event_types:
# event_type should never be None but we need to be defensive
# so we don't fail the migration because of a bad state
event_types.remove(None)
event_types.add(_EMPTY_EVENT_TYPE)
event_type_to_id = event_type_manager.get_many(event_types, session) event_type_to_id = event_type_manager.get_many(event_types, session)
if missing_event_types := { if missing_event_types := {
# We should never see see None for the event_Type in the events table event_type
# but we need to be defensive so we don't fail the migration
# because of a bad event
_EMPTY_EVENT_TYPE if event_type is None else event_type
for event_type, event_id in event_type_to_id.items() for event_type, event_id in event_type_to_id.items()
if event_id is None if event_id is None
}: }:
@ -1470,7 +1473,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
{ {
"event_id": event_id, "event_id": event_id,
"event_type": None, "event_type": None,
"event_type_id": event_type_to_id[event_type], "event_type_id": event_type_to_id[
_EMPTY_EVENT_TYPE if event_type is None else event_type
],
} }
for event_id, event_type in events for event_id, event_type in events
], ],
@ -1502,14 +1507,17 @@ def migrate_entity_ids(instance: Recorder) -> bool:
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
if states := session.execute(find_entity_ids_to_migrate()).all(): if states := session.execute(find_entity_ids_to_migrate()).all():
entity_ids = {entity_id for _, entity_id in states} entity_ids = {entity_id for _, entity_id in states}
if None in entity_ids:
# entity_id should never be None but we need to be defensive
# so we don't fail the migration because of a bad state
entity_ids.remove(None)
entity_ids.add(_EMPTY_ENTITY_ID)
entity_id_to_metadata_id = states_meta_manager.get_many( entity_id_to_metadata_id = states_meta_manager.get_many(
entity_ids, session, True entity_ids, session, True
) )
if missing_entity_ids := { if missing_entity_ids := {
# We should never see _EMPTY_ENTITY_ID in the states table entity_id
# but we need to be defensive so we don't fail the migration
# because of a bad state
_EMPTY_ENTITY_ID if entity_id is None else entity_id
for entity_id, metadata_id in entity_id_to_metadata_id.items() for entity_id, metadata_id in entity_id_to_metadata_id.items()
if metadata_id is None if metadata_id is None
}: }:
@ -1537,7 +1545,9 @@ def migrate_entity_ids(instance: Recorder) -> bool:
# the history queries still need to work while the # the history queries still need to work while the
# migration is in progress and we will do this in # migration is in progress and we will do this in
# post_migrate_entity_ids # post_migrate_entity_ids
"metadata_id": entity_id_to_metadata_id[entity_id], "metadata_id": entity_id_to_metadata_id[
_EMPTY_ENTITY_ID if entity_id is None else entity_id
],
} }
for state_id, entity_id in states for state_id, entity_id in states
], ],

View File

@ -957,7 +957,7 @@ async def test_migrate_entity_ids(
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
def _insert_events(): def _insert_states():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
session.add_all( session.add_all(
( (
@ -979,7 +979,7 @@ async def test_migrate_entity_ids(
) )
) )
await instance.async_add_executor_job(_insert_events) await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder # This is a threadsafe way to add a task to the recorder
@ -1065,3 +1065,149 @@ async def test_post_migrate_entity_ids(
assert states_by_state["one_1"] is None assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None assert states_by_state["two_2"] is None
assert states_by_state["two_1"] is None assert states_by_state["two_1"] is None
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_migrate_null_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
)
session.add_all(
States(
entity_id=None,
state="empty",
last_updated_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
states = (
session.query(
States.state,
States.metadata_id,
States.last_updated_ts,
StatesMeta.entity_id,
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.all()
)
assert len(states) == 1002
result = {}
for state in states:
result.setdefault(state.entity_id, []).append(
{
"state_id": state.entity_id,
"last_updated_ts": state.last_updated_ts,
"state": state.state,
}
)
return result
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
assert len(states_by_entity_id["sensor.one"]) == 2
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
async def test_migrate_null_event_type_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1.452529,
),
)
session.add_all(
Events(
event_type=None,
origin_idx=0,
time_fired_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventTypeIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
.filter(
Events.event_type_id.in_(
select_event_type_ids(
(
"event_type_one",
migration._EMPTY_EVENT_TYPE,
)
)
)
)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.all()
)
assert len(events) == 1002
result = {}
for event in events:
result.setdefault(event.event_type, []).append(
{
"event_id": event.event_id,
"time_fired": event.time_fired,
"event_type": event.event_type,
}
)
return result
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000