This commit is contained in:
J. Nick Koston 2025-07-04 10:11:19 -05:00
parent 4649599592
commit 37578f3e22
No known key found for this signature in database
2 changed files with 38 additions and 13 deletions

View File

@ -226,13 +226,14 @@ void HOT Scheduler::call() {
// - No deferred items exist in to_add_, so processing order doesn't affect correctness // - No deferred items exist in to_add_, so processing order doesn't affect correctness
while (!this->defer_queue_.empty()) { while (!this->defer_queue_.empty()) {
this->lock_.lock(); this->lock_.lock();
if (this->defer_queue_.empty()) { // Double-check with lock held if (this->defer_queue_.empty()) {
this->lock_.unlock(); this->lock_.unlock();
break; break;
} }
auto item = std::move(this->defer_queue_.front()); auto item = std::move(this->defer_queue_.front());
this->defer_queue_.pop_front(); this->defer_queue_.pop_front();
this->lock_.unlock(); this->lock_.unlock();
// Skip if item was marked for removal or component failed // Skip if item was marked for removal or component failed
if (!this->should_skip_item_(item.get())) { if (!this->should_skip_item_(item.get())) {
this->execute_item_(item.get()); this->execute_item_(item.get());

View File

@ -2,6 +2,7 @@
import asyncio import asyncio
from pathlib import Path from pathlib import Path
import re
from aioesphomeapi import UserService from aioesphomeapi import UserService
import pytest import pytest
@ -29,14 +30,25 @@ async def test_defer_stress(
# Create a future to signal test completion # Create a future to signal test completion
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
test_complete_future: asyncio.Future[bool] = loop.create_future() test_complete_future: asyncio.Future[None] = loop.create_future()
# Track executed defers
executed_defers = set()
def on_log_line(line: str) -> None: def on_log_line(line: str) -> None:
if not test_complete_future.done(): # Track all executed defers
if "✓ Stress test PASSED" in line: match = re.search(r"Executed defer (\d+)", line)
test_complete_future.set_result(True) if match:
elif "✗ Stress test FAILED" in line: defer_id = int(match.group(1))
test_complete_future.set_result(False) executed_defers.add(defer_id)
# Check if we've executed all 1000 defers (0-999)
if (
defer_id == 999
and len(executed_defers) == 1000
and not test_complete_future.done()
):
test_complete_future.set_result(None)
async with ( async with (
run_compiled(yaml_config, line_callback=on_log_line), run_compiled(yaml_config, line_callback=on_log_line),
@ -64,13 +76,25 @@ async def test_defer_stress(
# Call the run_stress_test service to start the test # Call the run_stress_test service to start the test
client.execute_service(run_stress_test_service, {}) client.execute_service(run_stress_test_service, {})
# Wait for test completion # Wait for all defers to execute (should be quick)
try: try:
test_passed = await asyncio.wait_for(test_complete_future, timeout=15.0) await asyncio.wait_for(test_complete_future, timeout=5.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pytest.fail("Stress test did not complete within 15 seconds") # Report how many we got
pytest.fail(
f"Stress test timed out. Only {len(executed_defers)} of 1000 defers executed. "
f"Missing IDs: {sorted(set(range(1000)) - executed_defers)[:10]}..."
)
# Verify the test passed # Verify all defers executed
assert test_passed is True, ( assert len(executed_defers) == 1000, (
"Stress test failed - defer() crashed or failed under thread pressure" f"Expected 1000 defers, got {len(executed_defers)}"
) )
# Verify we have all IDs from 0-999
expected_ids = set(range(1000))
missing_ids = expected_ids - executed_defers
assert not missing_ids, f"Missing defer IDs: {sorted(missing_ids)}"
# If we got here without crashing, the test passed
assert True, "Test completed successfully - all 1000 defers executed in order"