diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 4be01327654..fe1d7fdf91c 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1439,12 +1439,15 @@ def migrate_event_type_ids(instance: Recorder) -> bool: with session_scope(session=session_maker()) as session: if events := session.execute(find_event_type_to_migrate()).all(): event_types = {event_type for _, event_type in events} + if None in event_types: + # event_type should never be None but we need to be defensive + # so we don't fail the migration because of a bad state + event_types.remove(None) + event_types.add(_EMPTY_EVENT_TYPE) + event_type_to_id = event_type_manager.get_many(event_types, session) if missing_event_types := { - # We should never see see None for the event_Type in the events table - # but we need to be defensive so we don't fail the migration - # because of a bad event - _EMPTY_EVENT_TYPE if event_type is None else event_type + event_type for event_type, event_id in event_type_to_id.items() if event_id is None }: @@ -1470,7 +1473,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool: { "event_id": event_id, "event_type": None, - "event_type_id": event_type_to_id[event_type], + "event_type_id": event_type_to_id[ + _EMPTY_EVENT_TYPE if event_type is None else event_type + ], } for event_id, event_type in events ], @@ -1502,14 +1507,17 @@ def migrate_entity_ids(instance: Recorder) -> bool: with session_scope(session=instance.get_session()) as session: if states := session.execute(find_entity_ids_to_migrate()).all(): entity_ids = {entity_id for _, entity_id in states} + if None in entity_ids: + # entity_id should never be None but we need to be defensive + # so we don't fail the migration because of a bad state + entity_ids.remove(None) + entity_ids.add(_EMPTY_ENTITY_ID) + entity_id_to_metadata_id = states_meta_manager.get_many( entity_ids, session, True ) if missing_entity_ids := { - # We should never see _EMPTY_ENTITY_ID in the states table - # but we need to be defensive so we don't fail the migration - # because of a bad state - _EMPTY_ENTITY_ID if entity_id is None else entity_id + entity_id for entity_id, metadata_id in entity_id_to_metadata_id.items() if metadata_id is None }: @@ -1537,7 +1545,9 @@ def migrate_entity_ids(instance: Recorder) -> bool: # the history queries still need to work while the # migration is in progress and we will do this in # post_migrate_entity_ids - "metadata_id": entity_id_to_metadata_id[entity_id], + "metadata_id": entity_id_to_metadata_id[ + _EMPTY_ENTITY_ID if entity_id is None else entity_id + ], } for state_id, entity_id in states ], diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index fe4f1e016f5..6e54513830d 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -957,7 +957,7 @@ async def test_migrate_entity_ids( instance = await async_setup_recorder_instance(hass) await async_wait_recording_done(hass) - def _insert_events(): + def _insert_states(): with session_scope(hass=hass) as session: session.add_all( ( @@ -979,7 +979,7 @@ async def test_migrate_entity_ids( ) ) - await instance.async_add_executor_job(_insert_events) + await instance.async_add_executor_job(_insert_states) await async_wait_recording_done(hass) # This is a threadsafe way to add a task to the recorder @@ -1065,3 +1065,149 @@ async def test_post_migrate_entity_ids( assert states_by_state["one_1"] is None assert states_by_state["two_2"] is None assert states_by_state["two_1"] is None + + +@pytest.mark.parametrize("enable_migrate_entity_ids", [True]) +async def test_migrate_null_entity_ids( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test we can migrate entity_ids to the StatesMeta table.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + def _insert_states(): + with session_scope(hass=hass) as session: + session.add( + States( + entity_id="sensor.one", + state="one_1", + last_updated_ts=1.452529, + ), + ) + session.add_all( + States( + entity_id=None, + state="empty", + last_updated_ts=time + 1.452529, + ) + for time in range(1000) + ) + session.add( + States( + entity_id="sensor.one", + state="one_1", + last_updated_ts=2.452529, + ), + ) + + await instance.async_add_executor_job(_insert_states) + + await async_wait_recording_done(hass) + # This is a threadsafe way to add a task to the recorder + instance.queue_task(EntityIDMigrationTask()) + await async_recorder_block_till_done(hass) + await async_recorder_block_till_done(hass) + + def _fetch_migrated_states(): + with session_scope(hass=hass) as session: + states = ( + session.query( + States.state, + States.metadata_id, + States.last_updated_ts, + StatesMeta.entity_id, + ) + .outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id) + .all() + ) + assert len(states) == 1002 + result = {} + for state in states: + result.setdefault(state.entity_id, []).append( + { + "state_id": state.entity_id, + "last_updated_ts": state.last_updated_ts, + "state": state.state, + } + ) + return result + + states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states) + assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000 + assert len(states_by_entity_id["sensor.one"]) == 2 + + +@pytest.mark.parametrize("enable_migrate_event_type_ids", [True]) +async def test_migrate_null_event_type_ids( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test we can migrate event_types to the EventTypes table when the event_type is NULL.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + def _insert_events(): + with session_scope(hass=hass) as session: + session.add( + Events( + event_type="event_type_one", + origin_idx=0, + time_fired_ts=1.452529, + ), + ) + session.add_all( + Events( + event_type=None, + origin_idx=0, + time_fired_ts=time + 1.452529, + ) + for time in range(1000) + ) + session.add( + Events( + event_type="event_type_one", + origin_idx=0, + time_fired_ts=2.452529, + ), + ) + + await instance.async_add_executor_job(_insert_events) + + await async_wait_recording_done(hass) + # This is a threadsafe way to add a task to the recorder + + instance.queue_task(EventTypeIDMigrationTask()) + await async_recorder_block_till_done(hass) + await async_recorder_block_till_done(hass) + + def _fetch_migrated_events(): + with session_scope(hass=hass) as session: + events = ( + session.query(Events.event_id, Events.time_fired, EventTypes.event_type) + .filter( + Events.event_type_id.in_( + select_event_type_ids( + ( + "event_type_one", + migration._EMPTY_EVENT_TYPE, + ) + ) + ) + ) + .outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id) + .all() + ) + assert len(events) == 1002 + result = {} + for event in events: + result.setdefault(event.event_type, []).append( + { + "event_id": event.event_id, + "time_fired": event.time_fired, + "event_type": event.event_type, + } + ) + return result + + events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) + assert len(events_by_type["event_type_one"]) == 2 + assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000