From 339a3270f6a21cf524923c115ca278f39baaf8bd Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 6 Jul 2025 18:16:25 -0500 Subject: [PATCH] adjust --- .../fixtures/scheduler_defer_fifo_simple.yaml | 109 ++++++++++++++ .../fixtures/scheduler_defer_stress.yaml | 38 +++++ .../test_scheduler_defer_fifo_simple.py | 117 +++++++++++++++ .../test_scheduler_defer_stress.py | 137 ++++++++++++++++++ 4 files changed, 401 insertions(+) create mode 100644 tests/integration/fixtures/scheduler_defer_fifo_simple.yaml create mode 100644 tests/integration/fixtures/scheduler_defer_stress.yaml create mode 100644 tests/integration/test_scheduler_defer_fifo_simple.py create mode 100644 tests/integration/test_scheduler_defer_stress.py diff --git a/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml new file mode 100644 index 0000000000..7384082ac2 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_fifo_simple.yaml @@ -0,0 +1,109 @@ +esphome: + name: scheduler-defer-fifo-simple + +host: + +logger: + level: DEBUG + +api: + services: + - service: test_set_timeout + then: + - lambda: |- + // Test set_timeout with 0 delay (direct scheduler call) + static int set_timeout_order = 0; + static bool set_timeout_passed = true; + + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); + for (int i = 0; i < 10; i++) { + int expected = i; + App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { + ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); + if (set_timeout_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); + set_timeout_passed = false; + } + set_timeout_order++; + + if (set_timeout_order == 10) { + if (set_timeout_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + + ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + // Use a static instance so it doesn't go out of scope + static TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/scheduler_defer_stress.yaml b/tests/integration/fixtures/scheduler_defer_stress.yaml new file mode 100644 index 0000000000..0d9c1d1405 --- /dev/null +++ b/tests/integration/fixtures/scheduler_defer_stress.yaml @@ -0,0 +1,38 @@ +esphome: + name: scheduler-defer-stress-test + +external_components: + - source: + type: local + path: EXTERNAL_COMPONENT_PATH + components: [defer_stress_component] + +host: + +logger: + level: VERBOSE + +defer_stress_component: + id: defer_stress + +api: + services: + - service: run_stress_test + then: + - lambda: |- + id(defer_stress)->run_multi_thread_test(); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/test_scheduler_defer_fifo_simple.py b/tests/integration/test_scheduler_defer_fifo_simple.py new file mode 100644 index 0000000000..ce17afff33 --- /dev/null +++ b/tests/integration/test_scheduler_defer_fifo_simple.py @@ -0,0 +1,117 @@ +"""Simple test that defer() maintains FIFO order.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_fifo_simple( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() maintains FIFO order with a simple test.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-fifo-simple" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None + for service in services: + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service + + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states + # (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() + + def on_state(state: EntityState) -> None: + if not isinstance(state, Event): + return + + if ( + state.key == test_complete_entity.key + and state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + return + + if state.key == test_result_entity.key and not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) + + client.subscribe_states(on_state) + + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) + + # Wait for first test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") + + # Verify the test passed + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_scheduler_defer_stress.py b/tests/integration/test_scheduler_defer_stress.py new file mode 100644 index 0000000000..844efb59f7 --- /dev/null +++ b/tests/integration/test_scheduler_defer_stress.py @@ -0,0 +1,137 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio +from pathlib import Path +import re + +from aioesphomeapi import UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + # Get the absolute path to the external components directory + external_components_path = str( + Path(__file__).parent / "fixtures" / "external_components" + ) + + # Replace the placeholder in the YAML config with the actual path + yaml_config = yaml_config.replace( + "EXTERNAL_COMPONENT_PATH", external_components_path + ) + + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[None] = loop.create_future() + + # Track executed defers and their order + executed_defers: set[int] = set() + thread_executions: dict[ + int, list[int] + ] = {} # thread_id -> list of indices in execution order + fifo_violations: list[str] = [] + + def on_log_line(line: str) -> None: + # Track all executed defers with thread and index info + match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) + if not match: + return + + defer_id = int(match.group(1)) + thread_id = int(match.group(2)) + index = int(match.group(3)) + + executed_defers.add(defer_id) + + # Track execution order per thread + if thread_id not in thread_executions: + thread_executions[thread_id] = [] + + # Check FIFO ordering within thread + if thread_executions[thread_id] and thread_executions[thread_id][-1] >= index: + fifo_violations.append( + f"Thread {thread_id}: index {index} executed after " + f"{thread_executions[thread_id][-1]}" + ) + + thread_executions[thread_id].append(index) + + # Check if we've executed all 1000 defers (0-999) + if len(executed_defers) == 1000 and not test_complete_future.done(): + test_complete_future.set_result(None) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "scheduler-defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for all defers to execute (should be quick) + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + except asyncio.TimeoutError: + # Report how many we got + pytest.fail( + f"Stress test timed out. Only {len(executed_defers)} of " + f"1000 defers executed. Missing IDs: " + f"{sorted(set(range(1000)) - executed_defers)[:10]}..." + ) + + # Verify all defers executed + assert len(executed_defers) == 1000, ( + f"Expected 1000 defers, got {len(executed_defers)}" + ) + + # Verify we have all IDs from 0-999 + expected_ids = set(range(1000)) + missing_ids = expected_ids - executed_defers + assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" + + # Verify FIFO ordering was maintained within each thread + assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( + fifo_violations[:10] + ) + + # Verify each thread executed all its defers in order + for thread_id, indices in thread_executions.items(): + assert len(indices) == 100, ( + f"Thread {thread_id} executed {len(indices)} defers, expected 100" + ) + # Indices should be 0-99 in ascending order + assert indices == list(range(100)), ( + f"Thread {thread_id} executed indices out of order: {indices[:10]}..." + ) + + # If we got here without crashing and with proper ordering, the test passed + assert True, ( + "Test completed successfully - all 1000 defers executed with " + "FIFO ordering preserved" + )