From 71f78e3a8176c60e5fd4955c9a378d4600701317 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 10:00:25 -0500 Subject: [PATCH] fixes --- esphome/core/scheduler.cpp | 15 +++-- tests/integration/conftest.py | 22 +++++++ tests/integration/fixtures/defer_stress.yaml | 12 +--- tests/integration/test_defer_stress.py | 61 ++++++-------------- 4 files changed, 48 insertions(+), 62 deletions(-) diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index e0d2b70102..285354b262 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -225,14 +225,13 @@ void HOT Scheduler::call() { // - Items execute in exact order they were deferred (FIFO guarantee) // - No deferred items exist in to_add_, so processing order doesn't affect correctness while (!this->defer_queue_.empty()) { - std::unique_ptr item; - { - LockGuard guard{this->lock_}; - if (this->defer_queue_.empty()) // Double-check with lock held - break; - item = std::move(this->defer_queue_.front()); - this->defer_queue_.pop_front(); - } + this->lock_.lock(); + if (this->defer_queue_.empty()) // Double-check with lock held + this->lock_.unlock(); + break; + auto item = std::move(this->defer_queue_.front()); + this->defer_queue_.pop_front(); + this->lock_.unlock(); // Skip if item was marked for removal or component failed if (!this->should_skip_item_(item.get())) { this->execute_item_(item.get()); diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8f5f77ca52..56f2eb0a54 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -474,6 +474,14 @@ async def run_binary_and_wait_for_port( if process.returncode is not None: error_msg += f"\nProcess exited with code: {process.returncode}" + # Check for common signals + if process.returncode < 0: + sig = -process.returncode + try: + sig_name = signal.Signals(sig).name + error_msg += f" (killed by signal {sig_name})" + except ValueError: + error_msg += f" (killed by signal {sig})" # Include any output collected so far if stdout_lines: @@ -501,6 +509,20 @@ async def run_binary_and_wait_for_port( if controller_transport is not None: controller_transport.close() + # Log the exit code if process already exited + if process.returncode is not None: + print(f"\nProcess exited with code: {process.returncode}", file=sys.stderr) + if process.returncode < 0: + sig = -process.returncode + try: + sig_name = signal.Signals(sig).name + print( + f"Process was killed by signal {sig_name} ({sig})", + file=sys.stderr, + ) + except ValueError: + print(f"Process was killed by signal {sig}", file=sys.stderr) + # Cleanup: terminate the process gracefully if process.returncode is None: # Send SIGINT (Ctrl+C) for graceful shutdown diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml index 9400c33f11..6df475229b 100644 --- a/tests/integration/fixtures/defer_stress.yaml +++ b/tests/integration/fixtures/defer_stress.yaml @@ -10,7 +10,7 @@ external_components: host: logger: - level: DEBUG + level: VERBOSE defer_stress_component: id: defer_stress @@ -21,16 +21,6 @@ api: then: - lambda: |- id(defer_stress)->run_multi_thread_test(); - - wait_until: - lambda: |- - return id(defer_stress)->is_test_complete(); - - lambda: |- - if (id(defer_stress)->is_test_passed()) { - id(test_result)->trigger("passed"); - } else { - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); event: - platform: template diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py index ed0ae74a08..6dd9f15623 100644 --- a/tests/integration/test_defer_stress.py +++ b/tests/integration/test_defer_stress.py @@ -3,7 +3,7 @@ import asyncio from pathlib import Path -from aioesphomeapi import EntityState, Event, EventInfo, UserService +from aioesphomeapi import UserService import pytest from .types import APIClientConnectedFactory, RunCompiledFunction @@ -27,7 +27,21 @@ async def test_defer_stress( "EXTERNAL_COMPONENT_PATH", external_components_path ) - async with run_compiled(yaml_config), api_client_connected() as client: + # Create a future to signal test completion + loop = asyncio.get_event_loop() + test_complete_future: asyncio.Future[bool] = loop.create_future() + + def on_log_line(line: str) -> None: + if not test_complete_future.done(): + if "✓ Stress test PASSED" in line: + test_complete_future.set_result(True) + elif "✗ Stress test FAILED" in line: + test_complete_future.set_result(False) + + async with ( + run_compiled(yaml_config, line_callback=on_log_line), + api_client_connected() as client, + ): # Verify we can connect device_info = await client.device_info() assert device_info is not None @@ -38,20 +52,6 @@ async def test_defer_stress( client.list_entities_services(), timeout=5.0 ) - # Find our test entities - test_complete_entity: EventInfo | None = None - test_result_entity: EventInfo | None = None - - for entity in entity_info: - if isinstance(entity, EventInfo): - if entity.object_id == "test_complete": - test_complete_entity = entity - elif entity.object_id == "test_result": - test_result_entity = entity - - assert test_complete_entity is not None, "test_complete event not found" - assert test_result_entity is not None, "test_result event not found" - # Find our test service run_stress_test_service: UserService | None = None for service in services: @@ -61,37 +61,12 @@ async def test_defer_stress( assert run_stress_test_service is not None, "run_stress_test service not found" - # Get the event loop - loop = asyncio.get_running_loop() - - # Subscribe to states (events are delivered as EventStates through subscribe_states) - test_complete_future: asyncio.Future[bool] = loop.create_future() - test_result_future: asyncio.Future[bool] = loop.create_future() - - def on_state(state: EntityState) -> None: - if isinstance(state, Event): - if state.key == test_complete_entity.key: - if ( - state.event_type == "test_finished" - and not test_complete_future.done() - ): - test_complete_future.set_result(True) - elif state.key == test_result_entity.key: - if not test_result_future.done(): - if state.event_type == "passed": - test_result_future.set_result(True) - elif state.event_type == "failed": - test_result_future.set_result(False) - - client.subscribe_states(on_state) - # Call the run_stress_test service to start the test client.execute_service(run_stress_test_service, {}) - # Wait for test completion with a longer timeout (threads run for 100ms + processing time) + # Wait for test completion try: - await asyncio.wait_for(test_complete_future, timeout=15.0) - test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + test_passed = await asyncio.wait_for(test_complete_future, timeout=15.0) except asyncio.TimeoutError: pytest.fail("Stress test did not complete within 15 seconds")