Fix selecting entity_ids and device_ids in events with MySQL and PostgreSQL with logbook (#73918)

* Fix selecting entity_ids and device_ids in events with MySQL and PostgreSQL

Fixes #73818

* add cover
This commit is contained in:
J. Nick Koston 2022-06-24 08:43:35 -05:00 committed by GitHub
parent 6cafcb016f
commit f29cc33fa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 48 deletions

View File

@ -6,6 +6,7 @@ from datetime import datetime as dt
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.components.recorder.filters import Filters
from homeassistant.helpers.json import json_dumps
from .all import all_stmt
from .devices import devices_stmt
@ -45,34 +46,34 @@ def statement_for_request(
# entities and devices: logbook sends everything for the timeframe for the entities and devices
if entity_ids and device_ids:
json_quotable_entity_ids = list(entity_ids)
json_quotable_device_ids = list(device_ids)
json_quoted_entity_ids = [json_dumps(entity_id) for entity_id in entity_ids]
json_quoted_device_ids = [json_dumps(device_id) for device_id in device_ids]
return entities_devices_stmt(
start_day,
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quotable_device_ids,
json_quoted_entity_ids,
json_quoted_device_ids,
)
# entities: logbook sends everything for the timeframe for the entities
if entity_ids:
json_quotable_entity_ids = list(entity_ids)
json_quoted_entity_ids = [json_dumps(entity_id) for entity_id in entity_ids]
return entities_stmt(
start_day,
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quoted_entity_ids,
)
# devices: logbook sends everything for the timeframe for the devices
assert device_ids is not None
json_quotable_device_ids = list(device_ids)
json_quoted_device_ids = [json_dumps(device_id) for device_id in device_ids]
return devices_stmt(
start_day,
end_day,
event_types,
json_quotable_device_ids,
json_quoted_device_ids,
)

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime as dt
import sqlalchemy
from sqlalchemy import lambda_stmt, select
from sqlalchemy.orm import Query
from sqlalchemy.sql.elements import ClauseList
@ -93,4 +94,6 @@ def apply_event_device_id_matchers(
json_quotable_device_ids: Iterable[str],
) -> ClauseList:
"""Create matchers for the device_ids in the event_data."""
return DEVICE_ID_IN_EVENT.in_(json_quotable_device_ids)
return DEVICE_ID_IN_EVENT.is_not(None) & sqlalchemy.cast(
DEVICE_ID_IN_EVENT, sqlalchemy.Text()
).in_(json_quotable_device_ids)

View File

@ -36,12 +36,12 @@ def _select_entities_context_ids_sub_query(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quoted_entity_ids: list[str],
) -> CompoundSelect:
"""Generate a subquery to find context ids for multiple entities."""
union = union_all(
select_events_context_id_subquery(start_day, end_day, event_types).where(
apply_event_entity_id_matchers(json_quotable_entity_ids)
apply_event_entity_id_matchers(json_quoted_entity_ids)
),
apply_entities_hints(select(States.context_id))
.filter((States.last_updated > start_day) & (States.last_updated < end_day))
@ -56,7 +56,7 @@ def _apply_entities_context_union(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quoted_entity_ids: list[str],
) -> CompoundSelect:
"""Generate a CTE to find the entity and device context ids and a query to find linked row."""
entities_cte: CTE = _select_entities_context_ids_sub_query(
@ -64,7 +64,7 @@ def _apply_entities_context_union(
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quoted_entity_ids,
).cte()
# We used to optimize this to exclude rows we already in the union with
# a States.entity_id.not_in(entity_ids) but that made the
@ -91,19 +91,19 @@ def entities_stmt(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quoted_entity_ids: list[str],
) -> StatementLambdaElement:
"""Generate a logbook query for multiple entities."""
return lambda_stmt(
lambda: _apply_entities_context_union(
select_events_without_states(start_day, end_day, event_types).where(
apply_event_entity_id_matchers(json_quotable_entity_ids)
apply_event_entity_id_matchers(json_quoted_entity_ids)
),
start_day,
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quoted_entity_ids,
).order_by(Events.time_fired)
)
@ -118,12 +118,19 @@ def states_query_for_entity_ids(
def apply_event_entity_id_matchers(
json_quotable_entity_ids: Iterable[str],
json_quoted_entity_ids: Iterable[str],
) -> sqlalchemy.or_:
"""Create matchers for the entity_id in the event_data."""
return ENTITY_ID_IN_EVENT.in_(
json_quotable_entity_ids
) | OLD_ENTITY_ID_IN_EVENT.in_(json_quotable_entity_ids)
return sqlalchemy.or_(
ENTITY_ID_IN_EVENT.is_not(None)
& sqlalchemy.cast(ENTITY_ID_IN_EVENT, sqlalchemy.Text()).in_(
json_quoted_entity_ids
),
OLD_ENTITY_ID_IN_EVENT.is_not(None)
& sqlalchemy.cast(OLD_ENTITY_ID_IN_EVENT, sqlalchemy.Text()).in_(
json_quoted_entity_ids
),
)
def apply_entities_hints(query: Query) -> Query:

View File

@ -33,14 +33,14 @@ def _select_entities_device_id_context_ids_sub_query(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quotable_device_ids: list[str],
json_quoted_entity_ids: list[str],
json_quoted_device_ids: list[str],
) -> CompoundSelect:
"""Generate a subquery to find context ids for multiple entities and multiple devices."""
union = union_all(
select_events_context_id_subquery(start_day, end_day, event_types).where(
_apply_event_entity_id_device_id_matchers(
json_quotable_entity_ids, json_quotable_device_ids
json_quoted_entity_ids, json_quoted_device_ids
)
),
apply_entities_hints(select(States.context_id))
@ -56,16 +56,16 @@ def _apply_entities_devices_context_union(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quotable_device_ids: list[str],
json_quoted_entity_ids: list[str],
json_quoted_device_ids: list[str],
) -> CompoundSelect:
devices_entities_cte: CTE = _select_entities_device_id_context_ids_sub_query(
start_day,
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quotable_device_ids,
json_quoted_entity_ids,
json_quoted_device_ids,
).cte()
# We used to optimize this to exclude rows we already in the union with
# a States.entity_id.not_in(entity_ids) but that made the
@ -92,32 +92,32 @@ def entities_devices_stmt(
end_day: dt,
event_types: tuple[str, ...],
entity_ids: list[str],
json_quotable_entity_ids: list[str],
json_quotable_device_ids: list[str],
json_quoted_entity_ids: list[str],
json_quoted_device_ids: list[str],
) -> StatementLambdaElement:
"""Generate a logbook query for multiple entities."""
stmt = lambda_stmt(
lambda: _apply_entities_devices_context_union(
select_events_without_states(start_day, end_day, event_types).where(
_apply_event_entity_id_device_id_matchers(
json_quotable_entity_ids, json_quotable_device_ids
json_quoted_entity_ids, json_quoted_device_ids
)
),
start_day,
end_day,
event_types,
entity_ids,
json_quotable_entity_ids,
json_quotable_device_ids,
json_quoted_entity_ids,
json_quoted_device_ids,
).order_by(Events.time_fired)
)
return stmt
def _apply_event_entity_id_device_id_matchers(
json_quotable_entity_ids: Iterable[str], json_quotable_device_ids: Iterable[str]
json_quoted_entity_ids: Iterable[str], json_quoted_device_ids: Iterable[str]
) -> sqlalchemy.or_:
"""Create matchers for the device_id and entity_id in the event_data."""
return apply_event_entity_id_matchers(
json_quotable_entity_ids
) | apply_event_device_id_matchers(json_quotable_device_ids)
json_quoted_entity_ids
) | apply_event_device_id_matchers(json_quoted_device_ids)

View File

@ -94,7 +94,7 @@ async def _async_mock_entity_with_logbook_platform(hass):
return entry
async def _async_mock_device_with_logbook_platform(hass):
async def _async_mock_devices_with_logbook_platform(hass):
"""Mock an integration that provides a device that are described by the logbook."""
entry = MockConfigEntry(domain="test", data={"first": True}, options=None)
entry.add_to_hass(hass)
@ -109,8 +109,18 @@ async def _async_mock_device_with_logbook_platform(hass):
model="model",
suggested_area="Game Room",
)
device2 = dev_reg.async_get_or_create(
config_entry_id=entry.entry_id,
connections={(device_registry.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:CC")},
identifiers={("bridgeid", "4567")},
sw_version="sw-version",
name="device name",
manufacturer="manufacturer",
model="model",
suggested_area="Living Room",
)
await _async_mock_logbook_platform(hass)
return device
return [device, device2]
async def test_get_events(hass, hass_ws_client, recorder_mock):
@ -392,10 +402,13 @@ async def test_get_events_with_device_ids(hass, hass_ws_client, recorder_mock):
]
)
device = await _async_mock_device_with_logbook_platform(hass)
devices = await _async_mock_devices_with_logbook_platform(hass)
device = devices[0]
device2 = devices[1]
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
hass.bus.async_fire("mock_event", {"device_id": device.id})
hass.bus.async_fire("mock_event", {"device_id": device2.id})
hass.states.async_set("light.kitchen", STATE_OFF)
await hass.async_block_till_done()
@ -423,7 +436,7 @@ async def test_get_events_with_device_ids(hass, hass_ws_client, recorder_mock):
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"device_ids": [device.id],
"device_ids": [device.id, device2.id],
}
)
response = await client.receive_json()
@ -431,10 +444,13 @@ async def test_get_events_with_device_ids(hass, hass_ws_client, recorder_mock):
assert response["id"] == 1
results = response["result"]
assert len(results) == 1
assert len(results) == 2
assert results[0]["name"] == "device name"
assert results[0]["message"] == "is on fire"
assert isinstance(results[0]["when"], float)
assert results[1]["name"] == "device name"
assert results[1]["message"] == "is on fire"
assert isinstance(results[1]["when"], float)
await client.send_json(
{
@ -470,17 +486,20 @@ async def test_get_events_with_device_ids(hass, hass_ws_client, recorder_mock):
assert response["id"] == 3
results = response["result"]
assert len(results) == 4
assert len(results) == 5
assert results[0]["message"] == "started"
assert results[1]["name"] == "device name"
assert results[1]["message"] == "is on fire"
assert isinstance(results[1]["when"], float)
assert results[2]["entity_id"] == "light.kitchen"
assert results[2]["state"] == "on"
assert results[2]["name"] == "device name"
assert results[2]["message"] == "is on fire"
assert isinstance(results[2]["when"], float)
assert results[3]["entity_id"] == "light.kitchen"
assert results[3]["state"] == "off"
assert results[3]["state"] == "on"
assert isinstance(results[3]["when"], float)
assert results[4]["entity_id"] == "light.kitchen"
assert results[4]["state"] == "off"
assert isinstance(results[4]["when"], float)
@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0)
@ -1731,7 +1750,9 @@ async def test_subscribe_unsubscribe_logbook_stream_device(
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
device = await _async_mock_device_with_logbook_platform(hass)
devices = await _async_mock_devices_with_logbook_platform(hass)
device = devices[0]
device2 = devices[1]
await hass.async_block_till_done()
init_count = sum(hass.bus.async_listeners().values())
@ -1743,7 +1764,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device(
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"device_ids": [device.id],
"device_ids": [device.id, device2.id],
}
)
@ -1775,6 +1796,29 @@ async def test_subscribe_unsubscribe_logbook_stream_device(
{"domain": "test", "message": "is on fire", "name": "device name", "when": ANY}
]
for _ in range(3):
hass.bus.async_fire("mock_event", {"device_id": device.id})
hass.bus.async_fire("mock_event", {"device_id": device2.id})
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert msg["event"]["events"] == [
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
{
"domain": "test",
"message": "is on fire",
"name": "device name",
"when": ANY,
},
]
await websocket_client.send_json(
{"id": 8, "type": "unsubscribe_events", "subscription": 7}
)
@ -1950,7 +1994,8 @@ async def test_live_stream_with_one_second_commit_interval(
for comp in ("homeassistant", "logbook", "automation", "script")
]
)
device = await _async_mock_device_with_logbook_platform(hass)
devices = await _async_mock_devices_with_logbook_platform(hass)
device = devices[0]
await hass.async_block_till_done()
init_count = sum(hass.bus.async_listeners().values())
@ -2143,7 +2188,8 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo
]
)
await async_wait_recording_done(hass)
device = await _async_mock_device_with_logbook_platform(hass)
devices = await _async_mock_devices_with_logbook_platform(hass)
device = devices[0]
await async_wait_recording_done(hass)
# Block the recorder queue