diff --git a/homeassistant/components/influxdb/__init__.py b/homeassistant/components/influxdb/__init__.py index 24c80dc1d54..cd7e6a7ed88 100644 --- a/homeassistant/components/influxdb/__init__.py +++ b/homeassistant/components/influxdb/__init__.py @@ -512,7 +512,9 @@ class InfluxThread(threading.Thread): def __init__(self, hass, influx, event_to_json, max_tries): """Initialize the listener.""" threading.Thread.__init__(self, name=DOMAIN) - self.queue = queue.Queue() + self.queue: queue.SimpleQueue[ + threading.Event | tuple[float, Event] | None + ] = queue.SimpleQueue() self.influx = influx self.event_to_json = event_to_json self.max_tries = max_tries @@ -548,16 +550,17 @@ class InfluxThread(threading.Thread): if item is None: self.shutdown = True - else: + elif type(item) is tuple: # noqa: E721 timestamp, event = item age = time.monotonic() - timestamp if age < queue_seconds: - event_json = self.event_to_json(event) - if event_json: + if event_json := self.event_to_json(event): json.append(event_json) else: dropped += 1 + elif isinstance(item, threading.Event): + item.set() if dropped: _LOGGER.warning(CATCHING_UP_MESSAGE, dropped) @@ -590,12 +593,15 @@ class InfluxThread(threading.Thread): def run(self): """Process incoming events.""" while not self.shutdown: - count, json = self.get_events_json() + _, json = self.get_events_json() if json: self.write_to_influxdb(json) - for _ in range(count): - self.queue.task_done() def block_till_done(self): - """Block till all events processed.""" - self.queue.join() + """Block till all events processed. + + Currently only used for testing. + """ + event = threading.Event() + self.queue.put(event) + event.wait() diff --git a/tests/components/influxdb/test_init.py b/tests/components/influxdb/test_init.py index b6d68714af5..aa73e12a611 100644 --- a/tests/components/influxdb/test_init.py +++ b/tests/components/influxdb/test_init.py @@ -22,6 +22,15 @@ BASE_V2_CONFIG = { } +async def async_wait_for_queue_to_process(hass: HomeAssistant) -> None: + """Wait for the queue to be processed. + + In the future we should refactor this away to not have + to access hass.data directly. + """ + await hass.async_add_executor_job(hass.data[influxdb.DOMAIN].block_till_done) + + @dataclass class FilterTest: """Class for capturing a filter test.""" @@ -407,7 +416,7 @@ async def test_event_listener( hass.states.async_set("fake.entity_id", in_, attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -454,7 +463,7 @@ async def test_event_listener_no_units( ] hass.states.async_set("fake.entity_id", 1, attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -497,7 +506,7 @@ async def test_event_listener_inf( ] hass.states.async_set("fake.entity_id", 8, attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -539,7 +548,7 @@ async def test_event_listener_states( ] hass.states.async_set("fake.entity_id", state_state) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) if state_state == 1: @@ -564,7 +573,7 @@ async def execute_filter_test(hass: HomeAssistant, tests, write_api, get_mock_ca ] hass.states.async_set(test.id, 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) if test.should_pass: write_api.assert_called_once() @@ -927,7 +936,7 @@ async def test_event_listener_invalid_type( hass.states.async_set("fake.entity_id", in_, attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -970,7 +979,7 @@ async def test_event_listener_default_measurement( ] hass.states.async_set("fake.ok", 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1014,7 +1023,7 @@ async def test_event_listener_unit_of_measurement_field( ] hass.states.async_set("fake.entity_id", "foo", attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1062,7 +1071,7 @@ async def test_event_listener_tags_attributes( ] hass.states.async_set("fake.something", 1, attrs) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1120,7 +1129,7 @@ async def test_event_listener_component_override_measurement( ] hass.states.async_set(f"{comp['domain']}.{comp['id']}", 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1186,7 +1195,7 @@ async def test_event_listener_component_measurement_attr( ] hass.states.async_set(f"{comp['domain']}.{comp['id']}", 1, comp["attrs"]) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1271,7 +1280,7 @@ async def test_event_listener_ignore_attributes( }, ) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1317,7 +1326,7 @@ async def test_event_listener_ignore_attributes_overlapping_entities( ] hass.states.async_set("sensor.fake", 1, {"ignore": 1}) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1357,7 +1366,7 @@ async def test_event_listener_scheduled_write( with patch.object(influxdb.time, "sleep") as mock_sleep: hass.states.async_set("entity.entity_id", 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) assert mock_sleep.called assert write_api.call_count == 2 @@ -1366,7 +1375,7 @@ async def test_event_listener_scheduled_write( with patch.object(influxdb.time, "sleep") as mock_sleep: hass.states.async_set("entity.entity_id", "2") await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) assert not mock_sleep.called assert write_api.call_count == 3 @@ -1406,7 +1415,7 @@ async def test_event_listener_backlog_full( with patch("homeassistant.components.influxdb.time.monotonic", new=fast_monotonic): hass.states.async_set("entity.id", 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) assert get_write_api(mock_client).call_count == 0 @@ -1444,7 +1453,7 @@ async def test_event_listener_attribute_name_conflict( ] hass.states.async_set("fake.something", 1, {"value": "value_str"}) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1 @@ -1566,7 +1575,8 @@ async def test_invalid_inputs_error( with patch(f"{INFLUX_PATH}.time.sleep") as sleep: hass.states.async_set("fake.something", 1) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) + await hass.async_block_till_done() write_api.assert_called_once() assert ( @@ -1670,7 +1680,7 @@ async def test_precision( }, ) await hass.async_block_till_done() - hass.data[influxdb.DOMAIN].block_till_done() + await async_wait_for_queue_to_process(hass) write_api = get_write_api(mock_client) assert write_api.call_count == 1