This commit is contained in:
J. Nick Koston 2025-07-04 10:00:25 -05:00
parent f7ca26eef8
commit 71f78e3a81
No known key found for this signature in database
4 changed files with 48 additions and 62 deletions

View File

@ -225,14 +225,13 @@ void HOT Scheduler::call() {
// - Items execute in exact order they were deferred (FIFO guarantee) // - Items execute in exact order they were deferred (FIFO guarantee)
// - No deferred items exist in to_add_, so processing order doesn't affect correctness // - No deferred items exist in to_add_, so processing order doesn't affect correctness
while (!this->defer_queue_.empty()) { while (!this->defer_queue_.empty()) {
std::unique_ptr<SchedulerItem> item; this->lock_.lock();
{
LockGuard guard{this->lock_};
if (this->defer_queue_.empty()) // Double-check with lock held if (this->defer_queue_.empty()) // Double-check with lock held
this->lock_.unlock();
break; break;
item = std::move(this->defer_queue_.front()); auto item = std::move(this->defer_queue_.front());
this->defer_queue_.pop_front(); this->defer_queue_.pop_front();
} this->lock_.unlock();
// Skip if item was marked for removal or component failed // Skip if item was marked for removal or component failed
if (!this->should_skip_item_(item.get())) { if (!this->should_skip_item_(item.get())) {
this->execute_item_(item.get()); this->execute_item_(item.get());

View File

@ -474,6 +474,14 @@ async def run_binary_and_wait_for_port(
if process.returncode is not None: if process.returncode is not None:
error_msg += f"\nProcess exited with code: {process.returncode}" error_msg += f"\nProcess exited with code: {process.returncode}"
# Check for common signals
if process.returncode < 0:
sig = -process.returncode
try:
sig_name = signal.Signals(sig).name
error_msg += f" (killed by signal {sig_name})"
except ValueError:
error_msg += f" (killed by signal {sig})"
# Include any output collected so far # Include any output collected so far
if stdout_lines: if stdout_lines:
@ -501,6 +509,20 @@ async def run_binary_and_wait_for_port(
if controller_transport is not None: if controller_transport is not None:
controller_transport.close() controller_transport.close()
# Log the exit code if process already exited
if process.returncode is not None:
print(f"\nProcess exited with code: {process.returncode}", file=sys.stderr)
if process.returncode < 0:
sig = -process.returncode
try:
sig_name = signal.Signals(sig).name
print(
f"Process was killed by signal {sig_name} ({sig})",
file=sys.stderr,
)
except ValueError:
print(f"Process was killed by signal {sig}", file=sys.stderr)
# Cleanup: terminate the process gracefully # Cleanup: terminate the process gracefully
if process.returncode is None: if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown # Send SIGINT (Ctrl+C) for graceful shutdown

View File

@ -10,7 +10,7 @@ external_components:
host: host:
logger: logger:
level: DEBUG level: VERBOSE
defer_stress_component: defer_stress_component:
id: defer_stress id: defer_stress
@ -21,16 +21,6 @@ api:
then: then:
- lambda: |- - lambda: |-
id(defer_stress)->run_multi_thread_test(); id(defer_stress)->run_multi_thread_test();
- wait_until:
lambda: |-
return id(defer_stress)->is_test_complete();
- lambda: |-
if (id(defer_stress)->is_test_passed()) {
id(test_result)->trigger("passed");
} else {
id(test_result)->trigger("failed");
}
id(test_complete)->trigger("test_finished");
event: event:
- platform: template - platform: template

View File

@ -3,7 +3,7 @@
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from aioesphomeapi import EntityState, Event, EventInfo, UserService from aioesphomeapi import UserService
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -27,7 +27,21 @@ async def test_defer_stress(
"EXTERNAL_COMPONENT_PATH", external_components_path "EXTERNAL_COMPONENT_PATH", external_components_path
) )
async with run_compiled(yaml_config), api_client_connected() as client: # Create a future to signal test completion
loop = asyncio.get_event_loop()
test_complete_future: asyncio.Future[bool] = loop.create_future()
def on_log_line(line: str) -> None:
if not test_complete_future.done():
if "✓ Stress test PASSED" in line:
test_complete_future.set_result(True)
elif "✗ Stress test FAILED" in line:
test_complete_future.set_result(False)
async with (
run_compiled(yaml_config, line_callback=on_log_line),
api_client_connected() as client,
):
# Verify we can connect # Verify we can connect
device_info = await client.device_info() device_info = await client.device_info()
assert device_info is not None assert device_info is not None
@ -38,20 +52,6 @@ async def test_defer_stress(
client.list_entities_services(), timeout=5.0 client.list_entities_services(), timeout=5.0
) )
# Find our test entities
test_complete_entity: EventInfo | None = None
test_result_entity: EventInfo | None = None
for entity in entity_info:
if isinstance(entity, EventInfo):
if entity.object_id == "test_complete":
test_complete_entity = entity
elif entity.object_id == "test_result":
test_result_entity = entity
assert test_complete_entity is not None, "test_complete event not found"
assert test_result_entity is not None, "test_result event not found"
# Find our test service # Find our test service
run_stress_test_service: UserService | None = None run_stress_test_service: UserService | None = None
for service in services: for service in services:
@ -61,37 +61,12 @@ async def test_defer_stress(
assert run_stress_test_service is not None, "run_stress_test service not found" assert run_stress_test_service is not None, "run_stress_test service not found"
# Get the event loop
loop = asyncio.get_running_loop()
# Subscribe to states (events are delivered as EventStates through subscribe_states)
test_complete_future: asyncio.Future[bool] = loop.create_future()
test_result_future: asyncio.Future[bool] = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, Event):
if state.key == test_complete_entity.key:
if (
state.event_type == "test_finished"
and not test_complete_future.done()
):
test_complete_future.set_result(True)
elif state.key == test_result_entity.key:
if not test_result_future.done():
if state.event_type == "passed":
test_result_future.set_result(True)
elif state.event_type == "failed":
test_result_future.set_result(False)
client.subscribe_states(on_state)
# Call the run_stress_test service to start the test # Call the run_stress_test service to start the test
client.execute_service(run_stress_test_service, {}) client.execute_service(run_stress_test_service, {})
# Wait for test completion with a longer timeout (threads run for 100ms + processing time) # Wait for test completion
try: try:
await asyncio.wait_for(test_complete_future, timeout=15.0) test_passed = await asyncio.wait_for(test_complete_future, timeout=15.0)
test_passed = await asyncio.wait_for(test_result_future, timeout=1.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pytest.fail("Stress test did not complete within 15 seconds") pytest.fail("Stress test did not complete within 15 seconds")