Switch influxdb to use a SimpleQueue (#111798)

This commit is contained in:
J. Nick Koston 2024-02-29 12:49:43 -10:00 committed by GitHub
parent 0b0036fb12
commit 3b93c21d9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 28 deletions

View File

@ -512,7 +512,9 @@ class InfluxThread(threading.Thread):
def __init__(self, hass, influx, event_to_json, max_tries):
"""Initialize the listener."""
threading.Thread.__init__(self, name=DOMAIN)
self.queue = queue.Queue()
self.queue: queue.SimpleQueue[
threading.Event | tuple[float, Event] | None
] = queue.SimpleQueue()
self.influx = influx
self.event_to_json = event_to_json
self.max_tries = max_tries
@ -548,16 +550,17 @@ class InfluxThread(threading.Thread):
if item is None:
self.shutdown = True
else:
elif type(item) is tuple: # noqa: E721
timestamp, event = item
age = time.monotonic() - timestamp
if age < queue_seconds:
event_json = self.event_to_json(event)
if event_json:
if event_json := self.event_to_json(event):
json.append(event_json)
else:
dropped += 1
elif isinstance(item, threading.Event):
item.set()
if dropped:
_LOGGER.warning(CATCHING_UP_MESSAGE, dropped)
@ -590,12 +593,15 @@ class InfluxThread(threading.Thread):
def run(self):
"""Process incoming events."""
while not self.shutdown:
count, json = self.get_events_json()
_, json = self.get_events_json()
if json:
self.write_to_influxdb(json)
for _ in range(count):
self.queue.task_done()
def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
"""Block till all events processed.
Currently only used for testing.
"""
event = threading.Event()
self.queue.put(event)
event.wait()

View File

@ -22,6 +22,15 @@ BASE_V2_CONFIG = {
}
async def async_wait_for_queue_to_process(hass: HomeAssistant) -> None:
"""Wait for the queue to be processed.
In the future we should refactor this away to not have
to access hass.data directly.
"""
await hass.async_add_executor_job(hass.data[influxdb.DOMAIN].block_till_done)
@dataclass
class FilterTest:
"""Class for capturing a filter test."""
@ -407,7 +416,7 @@ async def test_event_listener(
hass.states.async_set("fake.entity_id", in_, attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -454,7 +463,7 @@ async def test_event_listener_no_units(
]
hass.states.async_set("fake.entity_id", 1, attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -497,7 +506,7 @@ async def test_event_listener_inf(
]
hass.states.async_set("fake.entity_id", 8, attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -539,7 +548,7 @@ async def test_event_listener_states(
]
hass.states.async_set("fake.entity_id", state_state)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
if state_state == 1:
@ -564,7 +573,7 @@ async def execute_filter_test(hass: HomeAssistant, tests, write_api, get_mock_ca
]
hass.states.async_set(test.id, 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
if test.should_pass:
write_api.assert_called_once()
@ -927,7 +936,7 @@ async def test_event_listener_invalid_type(
hass.states.async_set("fake.entity_id", in_, attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -970,7 +979,7 @@ async def test_event_listener_default_measurement(
]
hass.states.async_set("fake.ok", 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1014,7 +1023,7 @@ async def test_event_listener_unit_of_measurement_field(
]
hass.states.async_set("fake.entity_id", "foo", attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1062,7 +1071,7 @@ async def test_event_listener_tags_attributes(
]
hass.states.async_set("fake.something", 1, attrs)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1120,7 +1129,7 @@ async def test_event_listener_component_override_measurement(
]
hass.states.async_set(f"{comp['domain']}.{comp['id']}", 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1186,7 +1195,7 @@ async def test_event_listener_component_measurement_attr(
]
hass.states.async_set(f"{comp['domain']}.{comp['id']}", 1, comp["attrs"])
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1271,7 +1280,7 @@ async def test_event_listener_ignore_attributes(
},
)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1317,7 +1326,7 @@ async def test_event_listener_ignore_attributes_overlapping_entities(
]
hass.states.async_set("sensor.fake", 1, {"ignore": 1})
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1357,7 +1366,7 @@ async def test_event_listener_scheduled_write(
with patch.object(influxdb.time, "sleep") as mock_sleep:
hass.states.async_set("entity.entity_id", 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
assert mock_sleep.called
assert write_api.call_count == 2
@ -1366,7 +1375,7 @@ async def test_event_listener_scheduled_write(
with patch.object(influxdb.time, "sleep") as mock_sleep:
hass.states.async_set("entity.entity_id", "2")
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
assert not mock_sleep.called
assert write_api.call_count == 3
@ -1406,7 +1415,7 @@ async def test_event_listener_backlog_full(
with patch("homeassistant.components.influxdb.time.monotonic", new=fast_monotonic):
hass.states.async_set("entity.id", 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
assert get_write_api(mock_client).call_count == 0
@ -1444,7 +1453,7 @@ async def test_event_listener_attribute_name_conflict(
]
hass.states.async_set("fake.something", 1, {"value": "value_str"})
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1
@ -1566,7 +1575,8 @@ async def test_invalid_inputs_error(
with patch(f"{INFLUX_PATH}.time.sleep") as sleep:
hass.states.async_set("fake.something", 1)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
await hass.async_block_till_done()
write_api.assert_called_once()
assert (
@ -1670,7 +1680,7 @@ async def test_precision(
},
)
await hass.async_block_till_done()
hass.data[influxdb.DOMAIN].block_till_done()
await async_wait_for_queue_to_process(hass)
write_api = get_write_api(mock_client)
assert write_api.call_count == 1