diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml deleted file mode 100644 index db24ebf601..0000000000 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ /dev/null @@ -1,109 +0,0 @@ -esphome: - name: defer-fifo-simple - -host: - -logger: - level: DEBUG - -api: - services: - - service: test_set_timeout - then: - - lambda: |- - // Test set_timeout with 0 delay (direct scheduler call) - static int set_timeout_order = 0; - static bool set_timeout_passed = true; - - // Reset for this test - set_timeout_order = 0; - set_timeout_passed = true; - - ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); - for (int i = 0; i < 10; i++) { - int expected = i; - App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { - ESP_LOGD("defer_test", "set_timeout(0) item %d executed, order %d", expected, set_timeout_order); - if (set_timeout_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in set_timeout: expected %d but got execution order %d", expected, set_timeout_order); - set_timeout_passed = false; - } - set_timeout_order++; - - if (set_timeout_order == 10) { - if (set_timeout_passed) { - ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); - id(test_result)->trigger("passed"); - } else { - ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); - } - }); - } - - ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); - - - service: test_defer - then: - - lambda: |- - // Test defer() method (component method) - static int defer_order = 0; - static bool defer_passed = true; - - // Reset for this test - defer_order = 0; - defer_passed = true; - - ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); - - // Create a test component class that exposes defer() - class TestComponent : public Component { - public: - void test_defer() { - for (int i = 0; i < 10; i++) { - int expected = i; - this->defer([expected]() { - ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); - if (defer_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); - defer_passed = false; - } - defer_order++; - - if (defer_order == 10) { - if (defer_passed) { - ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); - id(test_result)->trigger("passed"); - } else { - ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); - } - }); - } - } - }; - - // Use a static instance so it doesn't go out of scope - static TestComponent test_component; - test_component.test_defer(); - - ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); - -event: - - platform: template - name: "Test Complete" - id: test_complete - device_class: button - event_types: - - "test_finished" - - platform: template - name: "Test Result" - id: test_result - device_class: button - event_types: - - "passed" - - "failed" diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml deleted file mode 100644 index 6df475229b..0000000000 --- a/tests/integration/fixtures/defer_stress.yaml +++ /dev/null @@ -1,38 +0,0 @@ -esphome: - name: defer-stress-test - -external_components: - - source: - type: local - path: EXTERNAL_COMPONENT_PATH - components: [defer_stress_component] - -host: - -logger: - level: VERBOSE - -defer_stress_component: - id: defer_stress - -api: - services: - - service: run_stress_test - then: - - lambda: |- - id(defer_stress)->run_multi_thread_test(); - -event: - - platform: template - name: "Test Complete" - id: test_complete - device_class: button - event_types: - - "test_finished" - - platform: template - name: "Test Result" - id: test_result - device_class: button - event_types: - - "passed" - - "failed" diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py deleted file mode 100644 index 5a62a45786..0000000000 --- a/tests/integration/test_defer_fifo_simple.py +++ /dev/null @@ -1,117 +0,0 @@ -"""Simple test that defer() maintains FIFO order.""" - -import asyncio - -from aioesphomeapi import EntityState, Event, EventInfo, UserService -import pytest - -from .types import APIClientConnectedFactory, RunCompiledFunction - - -@pytest.mark.asyncio -async def test_defer_fifo_simple( - yaml_config: str, - run_compiled: RunCompiledFunction, - api_client_connected: APIClientConnectedFactory, -) -> None: - """Test that defer() maintains FIFO order with a simple test.""" - - async with run_compiled(yaml_config), api_client_connected() as client: - # Verify we can connect - device_info = await client.device_info() - assert device_info is not None - assert device_info.name == "defer-fifo-simple" - - # List entities and services - entity_info, services = await asyncio.wait_for( - client.list_entities_services(), timeout=5.0 - ) - - # Find our test entities - test_complete_entity: EventInfo | None = None - test_result_entity: EventInfo | None = None - - for entity in entity_info: - if isinstance(entity, EventInfo): - if entity.object_id == "test_complete": - test_complete_entity = entity - elif entity.object_id == "test_result": - test_result_entity = entity - - assert test_complete_entity is not None, "test_complete event not found" - assert test_result_entity is not None, "test_result event not found" - - # Find our test services - test_set_timeout_service: UserService | None = None - test_defer_service: UserService | None = None - for service in services: - if service.name == "test_set_timeout": - test_set_timeout_service = service - elif service.name == "test_defer": - test_defer_service = service - - assert test_set_timeout_service is not None, ( - "test_set_timeout service not found" - ) - assert test_defer_service is not None, "test_defer service not found" - - # Get the event loop - loop = asyncio.get_running_loop() - - # Subscribe to states - # (events are delivered as EventStates through subscribe_states) - test_complete_future: asyncio.Future[bool] = loop.create_future() - test_result_future: asyncio.Future[bool] = loop.create_future() - - def on_state(state: EntityState) -> None: - if not isinstance(state, Event): - return - - if ( - state.key == test_complete_entity.key - and state.event_type == "test_finished" - and not test_complete_future.done() - ): - test_complete_future.set_result(True) - return - - if state.key == test_result_entity.key and not test_result_future.done(): - if state.event_type == "passed": - test_result_future.set_result(True) - elif state.event_type == "failed": - test_result_future.set_result(False) - - client.subscribe_states(on_state) - - # Test 1: Test set_timeout(0) - client.execute_service(test_set_timeout_service, {}) - - # Wait for first test completion - try: - await asyncio.wait_for(test_complete_future, timeout=5.0) - test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) - except asyncio.TimeoutError: - pytest.fail("Test set_timeout(0) did not complete within 5 seconds") - - assert test1_passed is True, ( - "set_timeout(0) FIFO test failed - items executed out of order" - ) - - # Reset futures for second test - test_complete_future = loop.create_future() - test_result_future = loop.create_future() - - # Test 2: Test defer() - client.execute_service(test_defer_service, {}) - - # Wait for second test completion - try: - await asyncio.wait_for(test_complete_future, timeout=5.0) - test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) - except asyncio.TimeoutError: - pytest.fail("Test defer() did not complete within 5 seconds") - - # Verify the test passed - assert test2_passed is True, ( - "defer() FIFO test failed - items executed out of order" - ) diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py deleted file mode 100644 index f63ec8d25f..0000000000 --- a/tests/integration/test_defer_stress.py +++ /dev/null @@ -1,137 +0,0 @@ -"""Stress test for defer() thread safety with multiple threads.""" - -import asyncio -from pathlib import Path -import re - -from aioesphomeapi import UserService -import pytest - -from .types import APIClientConnectedFactory, RunCompiledFunction - - -@pytest.mark.asyncio -async def test_defer_stress( - yaml_config: str, - run_compiled: RunCompiledFunction, - api_client_connected: APIClientConnectedFactory, -) -> None: - """Test that defer() doesn't crash when called rapidly from multiple threads.""" - - # Get the absolute path to the external components directory - external_components_path = str( - Path(__file__).parent / "fixtures" / "external_components" - ) - - # Replace the placeholder in the YAML config with the actual path - yaml_config = yaml_config.replace( - "EXTERNAL_COMPONENT_PATH", external_components_path - ) - - # Create a future to signal test completion - loop = asyncio.get_event_loop() - test_complete_future: asyncio.Future[None] = loop.create_future() - - # Track executed defers and their order - executed_defers: set[int] = set() - thread_executions: dict[ - int, list[int] - ] = {} # thread_id -> list of indices in execution order - fifo_violations: list[str] = [] - - def on_log_line(line: str) -> None: - # Track all executed defers with thread and index info - match = re.search(r"Executed defer (\d+) \(thread (\d+), index (\d+)\)", line) - if not match: - return - - defer_id = int(match.group(1)) - thread_id = int(match.group(2)) - index = int(match.group(3)) - - executed_defers.add(defer_id) - - # Track execution order per thread - if thread_id not in thread_executions: - thread_executions[thread_id] = [] - - # Check FIFO ordering within thread - if thread_executions[thread_id] and thread_executions[thread_id][-1] >= index: - fifo_violations.append( - f"Thread {thread_id}: index {index} executed after " - f"{thread_executions[thread_id][-1]}" - ) - - thread_executions[thread_id].append(index) - - # Check if we've executed all 1000 defers (0-999) - if len(executed_defers) == 1000 and not test_complete_future.done(): - test_complete_future.set_result(None) - - async with ( - run_compiled(yaml_config, line_callback=on_log_line), - api_client_connected() as client, - ): - # Verify we can connect - device_info = await client.device_info() - assert device_info is not None - assert device_info.name == "defer-stress-test" - - # List entities and services - entity_info, services = await asyncio.wait_for( - client.list_entities_services(), timeout=5.0 - ) - - # Find our test service - run_stress_test_service: UserService | None = None - for service in services: - if service.name == "run_stress_test": - run_stress_test_service = service - break - - assert run_stress_test_service is not None, "run_stress_test service not found" - - # Call the run_stress_test service to start the test - client.execute_service(run_stress_test_service, {}) - - # Wait for all defers to execute (should be quick) - try: - await asyncio.wait_for(test_complete_future, timeout=5.0) - except asyncio.TimeoutError: - # Report how many we got - pytest.fail( - f"Stress test timed out. Only {len(executed_defers)} of " - f"1000 defers executed. Missing IDs: " - f"{sorted(set(range(1000)) - executed_defers)[:10]}..." - ) - - # Verify all defers executed - assert len(executed_defers) == 1000, ( - f"Expected 1000 defers, got {len(executed_defers)}" - ) - - # Verify we have all IDs from 0-999 - expected_ids = set(range(1000)) - missing_ids = expected_ids - executed_defers - assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}" - - # Verify FIFO ordering was maintained within each thread - assert not fifo_violations, "FIFO ordering violations detected:\n" + "\n".join( - fifo_violations[:10] - ) - - # Verify each thread executed all its defers in order - for thread_id, indices in thread_executions.items(): - assert len(indices) == 100, ( - f"Thread {thread_id} executed {len(indices)} defers, expected 100" - ) - # Indices should be 0-99 in ascending order - assert indices == list(range(100)), ( - f"Thread {thread_id} executed indices out of order: {indices[:10]}..." - ) - - # If we got here without crashing and with proper ordering, the test passed - assert True, ( - "Test completed successfully - all 1000 defers executed with " - "FIFO ordering preserved" - )