no flakey

This commit is contained in:
J. Nick Koston 2025-07-06 22:54:46 -05:00
parent aaec4b7bd3
commit 8c13eab731
No known key found for this signature in database
4 changed files with 147 additions and 29 deletions

View File

@ -38,6 +38,48 @@ void SchedulerStringLifetimeComponent::run_string_lifetime_test() {
});
}
void SchedulerStringLifetimeComponent::run_test1() {
test_temporary_string_lifetime();
// Wait for all callbacks to execute
this->set_timeout("test1_complete", 10, [this]() { ESP_LOGI(TAG, "Test 1 complete"); });
}
void SchedulerStringLifetimeComponent::run_test2() {
test_scope_exit_string();
// Wait for all callbacks to execute
this->set_timeout("test2_complete", 20, [this]() { ESP_LOGI(TAG, "Test 2 complete"); });
}
void SchedulerStringLifetimeComponent::run_test3() {
test_vector_reallocation();
// Wait for all callbacks to execute
this->set_timeout("test3_complete", 60, [this]() { ESP_LOGI(TAG, "Test 3 complete"); });
}
void SchedulerStringLifetimeComponent::run_test4() {
test_string_move_semantics();
// Wait for all callbacks to execute
this->set_timeout("test4_complete", 35, [this]() { ESP_LOGI(TAG, "Test 4 complete"); });
}
void SchedulerStringLifetimeComponent::run_test5() {
test_lambda_capture_lifetime();
// Wait for all callbacks to execute
this->set_timeout("test5_complete", 50, [this]() { ESP_LOGI(TAG, "Test 5 complete"); });
}
void SchedulerStringLifetimeComponent::run_final_check() {
ESP_LOGI(TAG, "String lifetime tests complete");
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
if (this->tests_failed_ == 0) {
ESP_LOGI(TAG, "SUCCESS: All string lifetime tests passed!");
} else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
}
}
void SchedulerStringLifetimeComponent::test_temporary_string_lifetime() {
ESP_LOGI(TAG, "Test 1: Temporary string lifetime for timeout names");

View File

@ -14,6 +14,14 @@ class SchedulerStringLifetimeComponent : public Component {
void run_string_lifetime_test();
// Individual test methods exposed as services
void run_test1();
void run_test2();
void run_test3();
void run_test4();
void run_test5();
void run_final_check();
private:
void test_temporary_string_lifetime();
void test_scope_exit_string();

View File

@ -21,3 +21,27 @@ api:
then:
- lambda: |-
id(string_lifetime)->run_string_lifetime_test();
- service: run_test1
then:
- lambda: |-
id(string_lifetime)->run_test1();
- service: run_test2
then:
- lambda: |-
id(string_lifetime)->run_test2();
- service: run_test3
then:
- lambda: |-
id(string_lifetime)->run_test3();
- service: run_test4
then:
- lambda: |-
id(string_lifetime)->run_test4();
- service: run_test5
then:
- lambda: |-
id(string_lifetime)->run_test5();
- service: run_final_check
then:
- lambda: |-
id(string_lifetime)->run_final_check();

View File

@ -4,7 +4,6 @@ import asyncio
from pathlib import Path
import re
from aioesphomeapi import UserService
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@ -28,19 +27,42 @@ async def test_scheduler_string_lifetime(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
# Create a future to signal test completion
loop = asyncio.get_running_loop()
test_complete_future: asyncio.Future[None] = loop.create_future()
# Create events for synchronization
test1_complete = asyncio.Event()
test2_complete = asyncio.Event()
test3_complete = asyncio.Event()
test4_complete = asyncio.Event()
test5_complete = asyncio.Event()
all_tests_complete = asyncio.Event()
# Track test progress
test_stats = {
"tests_passed": 0,
"tests_failed": 0,
"errors": [],
"use_after_free_detected": False,
"current_test": None,
"test_callbacks_executed": {},
}
def on_log_line(line: str) -> None:
# Track test-specific events
if "Test 1 complete" in line:
test1_complete.set()
elif "Test 2 complete" in line:
test2_complete.set()
elif "Test 3 complete" in line:
test3_complete.set()
elif "Test 4 complete" in line:
test4_complete.set()
elif "Test 5 complete" in line:
test5_complete.set()
# Track individual callback executions
callback_match = re.search(r"Callback '(.+?)' executed", line)
if callback_match:
callback_name = callback_match.group(1)
test_stats["test_callbacks_executed"][callback_name] = True
# Track test results from the C++ test output
if "Tests passed:" in line and "string_lifetime" in line:
# Extract the number from "Tests passed: 32"
@ -68,16 +90,11 @@ async def test_scheduler_string_lifetime(
"invalid pointer",
]
):
test_stats["use_after_free_detected"] = True
if not test_complete_future.done():
test_complete_future.set_exception(
Exception(f"Memory corruption detected: {line}")
)
return
pytest.fail(f"Memory corruption detected: {line}")
# Check for completion
if "String lifetime tests complete" in line and not test_complete_future.done():
test_complete_future.set_result(None)
if "String lifetime tests complete" in line:
all_tests_complete.set()
async with (
run_compiled(yaml_config, line_callback=on_log_line),
@ -93,29 +110,56 @@ async def test_scheduler_string_lifetime(
client.list_entities_services(), timeout=5.0
)
# Find our test service
run_test_service: UserService | None = None
# Find our test services
test_services = {}
for service in services:
if service.name == "run_string_lifetime_test":
run_test_service = service
break
if service.name == "run_test1":
test_services["test1"] = service
elif service.name == "run_test2":
test_services["test2"] = service
elif service.name == "run_test3":
test_services["test3"] = service
elif service.name == "run_test4":
test_services["test4"] = service
elif service.name == "run_test5":
test_services["test5"] = service
elif service.name == "run_final_check":
test_services["final"] = service
assert run_test_service is not None, (
"run_string_lifetime_test service not found"
)
# Ensure all services are found
required_services = ["test1", "test2", "test3", "test4", "test5", "final"]
for service_name in required_services:
assert service_name in test_services, f"{service_name} service not found"
# Call the service to start the test
client.execute_service(run_test_service, {})
# Wait for test to complete
# Run tests sequentially, waiting for each to complete
try:
await asyncio.wait_for(test_complete_future, timeout=30.0)
# Test 1
client.execute_service(test_services["test1"], {})
await asyncio.wait_for(test1_complete.wait(), timeout=5.0)
# Test 2
client.execute_service(test_services["test2"], {})
await asyncio.wait_for(test2_complete.wait(), timeout=5.0)
# Test 3
client.execute_service(test_services["test3"], {})
await asyncio.wait_for(test3_complete.wait(), timeout=5.0)
# Test 4
client.execute_service(test_services["test4"], {})
await asyncio.wait_for(test4_complete.wait(), timeout=5.0)
# Test 5
client.execute_service(test_services["test5"], {})
await asyncio.wait_for(test5_complete.wait(), timeout=5.0)
# Final check
client.execute_service(test_services["final"], {})
await asyncio.wait_for(all_tests_complete.wait(), timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(f"String lifetime test timed out. Stats: {test_stats}")
# Check for use-after-free
assert not test_stats["use_after_free_detected"], "Use-after-free detected!"
# Check for any errors
assert test_stats["tests_failed"] == 0, f"Tests failed: {test_stats['errors']}"