stress test

This commit is contained in:
J. Nick Koston 2025-07-04 08:49:12 -05:00
parent ca70f17b3b
commit cd2b50c27f
No known key found for this signature in database
7 changed files with 517 additions and 70 deletions

View File

@ -8,14 +8,18 @@ logger:
api:
services:
- service: run_defer_test
- service: test_set_timeout
then:
- lambda: |-
// Test 1: Test set_timeout with 0 delay (direct scheduler call)
// Test set_timeout with 0 delay (direct scheduler call)
static int set_timeout_order = 0;
static bool set_timeout_passed = true;
ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order...");
// Reset for this test
set_timeout_order = 0;
set_timeout_passed = true;
ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order...");
for (int i = 0; i < 10; i++) {
int expected = i;
App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() {
@ -28,68 +32,66 @@ api:
if (set_timeout_order == 10) {
if (set_timeout_passed) {
ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order");
ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order");
id(test_result)->trigger("passed");
} else {
ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order");
ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order");
id(test_result)->trigger("failed");
}
// Start Test 2 after Test 1 completes
App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() {
// Test 2: Test defer() method (component method)
static int defer_order = 0;
static bool defer_passed = true;
ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order...");
// Create a test component class that exposes defer()
class TestComponent : public Component {
public:
void test_defer() {
for (int i = 0; i < 10; i++) {
int expected = i;
this->defer([expected]() {
ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order);
if (defer_order != expected) {
ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order);
defer_passed = false;
}
defer_order++;
if (defer_order == 10) {
bool all_passed = set_timeout_passed && defer_passed;
if (defer_passed) {
ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order");
if (all_passed) {
ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order");
}
} else {
ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order");
}
// Fire test result events
if (all_passed) {
id(test_result)->trigger("passed");
} else {
id(test_result)->trigger("failed");
}
id(test_complete)->trigger("test_finished");
}
});
}
}
};
TestComponent test_component;
test_component.test_defer();
ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution...");
});
id(test_complete)->trigger("test_finished");
}
});
}
ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution...");
- service: test_defer
then:
- lambda: |-
// Test defer() method (component method)
static int defer_order = 0;
static bool defer_passed = true;
// Reset for this test
defer_order = 0;
defer_passed = true;
ESP_LOGD("defer_test", "Testing defer() for FIFO order...");
// Create a test component class that exposes defer()
class TestComponent : public Component {
public:
void test_defer() {
for (int i = 0; i < 10; i++) {
int expected = i;
this->defer([expected]() {
ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order);
if (defer_order != expected) {
ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order);
defer_passed = false;
}
defer_order++;
if (defer_order == 10) {
if (defer_passed) {
ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order");
id(test_result)->trigger("passed");
} else {
ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order");
id(test_result)->trigger("failed");
}
id(test_complete)->trigger("test_finished");
}
});
}
}
};
TestComponent test_component;
test_component.test_defer();
ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution...");
event:
- platform: template
name: "Test Complete"

View File

@ -0,0 +1,77 @@
esphome:
name: defer-stress-test
host:
logger:
level: DEBUG
api:
services:
- service: run_stress_test
then:
- lambda: |-
static int total_defers = 0;
static int executed_defers = 0;
ESP_LOGI("stress", "Starting defer stress test - rapid sequential defers");
// Reset counters
total_defers = 0;
executed_defers = 0;
// Create a temporary component to access defer()
class TestComponent : public Component {
public:
void run_test() {
// Rapidly defer many callbacks to stress the defer mechanism
for (int batch = 0; batch < 10; batch++) {
for (int i = 0; i < 100; i++) {
int expected_id = total_defers;
this->defer([expected_id]() {
executed_defers++;
ESP_LOGV("stress", "Defer %d executed", expected_id);
});
total_defers++;
}
// Brief yield to let other work happen
delay(1);
}
}
};
TestComponent test_comp;
test_comp.run_test();
ESP_LOGI("stress", "Scheduled %d defers", total_defers);
// Give the main loop time to process all defers
App.scheduler.set_timeout((Component*)nullptr, nullptr, 500, []() {
ESP_LOGI("stress", "Test complete. Defers scheduled: %d, executed: %d", total_defers, executed_defers);
// We should have executed all defers without crashing
if (executed_defers == total_defers && total_defers == 1000) {
ESP_LOGI("stress", "✓ Stress test PASSED - All %d defers executed", total_defers);
id(test_result)->trigger("passed");
} else {
ESP_LOGE("stress", "✗ Stress test FAILED - Expected 1000 executed, got %d", executed_defers);
id(test_result)->trigger("failed");
}
id(test_complete)->trigger("test_finished");
});
event:
- platform: template
name: "Test Complete"
id: test_complete
device_class: button
event_types:
- "test_finished"
- platform: template
name: "Test Result"
id: test_result
device_class: button
event_types:
- "passed"
- "failed"

View File

@ -0,0 +1,35 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
CODEOWNERS = ["@test"]
AUTO_LOAD = ["api"]
api_buffer_test_component_ns = cg.esphome_ns.namespace("api_buffer_test_component")
APIBufferTestComponent = api_buffer_test_component_ns.class_(
"APIBufferTestComponent", cg.Component
)
CONF_FILL_SIZE = "fill_size"
CONF_FILL_COUNT = "fill_count"
CONF_AUTO_FILL_DELAY = "auto_fill_delay"
CONFIG_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(APIBufferTestComponent),
cv.Optional(CONF_FILL_SIZE, default=2048): cv.int_range(min=1, max=16384),
cv.Optional(CONF_FILL_COUNT, default=200): cv.int_range(min=1, max=1000),
cv.Optional(
CONF_AUTO_FILL_DELAY, default="2s"
): cv.positive_time_period_milliseconds,
}
).extend(cv.COMPONENT_SCHEMA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
cg.add(var.set_fill_size(config[CONF_FILL_SIZE]))
cg.add(var.set_fill_count(config[CONF_FILL_COUNT]))
cg.add(var.set_auto_fill_delay(config[CONF_AUTO_FILL_DELAY]))

View File

@ -0,0 +1,166 @@
#include "api_buffer_test_component.h"
#include "esphome/core/application.h"
namespace esphome {
namespace api_buffer_test_component {
APIBufferTestComponent *global_api_buffer_test_component =
nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
void APIBufferTestComponent::setup() {
ESP_LOGD(TAG, "API Buffer Test Component setup");
this->last_fill_time_ = millis();
global_api_buffer_test_component = this;
// For testing, we'll get the API connection through a hack
// In a real implementation, this would be done properly through the API
App.scheduler.set_timeout(this, "get_api_connection", 500, [this]() {
auto *api_server = api::global_api_server;
if (api_server != nullptr) {
// This is a hack - in production code, use proper API subscription
// For testing, we'll assume there's only one connection
ESP_LOGD(TAG, "Looking for API connection to subscribe to");
}
});
}
void APIBufferTestComponent::loop() {
// Check if API server is ready and has connections
auto *api_server = api::global_api_server;
if (api_server == nullptr || !api_server->is_connected()) {
return;
}
// Try to get an API connection if we don't have one
if (this->api_connection_ == nullptr && !this->tried_subscribe_) {
this->tried_subscribe_ = true;
ESP_LOGD(TAG, "API server is connected, buffer test component ready");
// For testing, we'll work with the fact that send_message is available
// through the global API server's connection management
}
uint32_t now = millis();
// Auto-fill buffer after delay if configured
if (this->auto_fill_delay_ > 0 && !this->buffer_filled_ && api_server->is_connected()) {
if (now - this->last_fill_time_ > this->auto_fill_delay_) {
ESP_LOGD(TAG, "Auto-filling buffer after %u ms delay", this->auto_fill_delay_);
// For the test, we'll generate heavy log traffic instead
this->generate_heavy_traffic();
this->buffer_filled_ = true;
// Keep generating traffic for 5 seconds
this->should_keep_full_ = true;
this->keep_full_until_ = now + 5000;
}
}
// Keep buffer full if requested
if (this->should_keep_full_ && now < this->keep_full_until_) {
// Generate more traffic to keep buffer full
this->generate_traffic_burst();
} else if (this->should_keep_full_ && now >= this->keep_full_until_) {
this->should_keep_full_ = false;
ESP_LOGD(TAG, "Stopped keeping buffer full");
}
}
void APIBufferTestComponent::subscribe_api_connection(api::APIConnection *api_connection) {
if (this->api_connection_ != nullptr) {
ESP_LOGE(TAG, "Already subscribed to an API connection");
return;
}
this->api_connection_ = api_connection;
ESP_LOGD(TAG, "Subscribed to API connection");
}
void APIBufferTestComponent::unsubscribe_api_connection(api::APIConnection *api_connection) {
if (this->api_connection_ != api_connection) {
return;
}
this->api_connection_ = nullptr;
ESP_LOGD(TAG, "Unsubscribed from API connection");
}
void APIBufferTestComponent::fill_buffer() {
if (this->api_connection_ == nullptr) {
ESP_LOGW(TAG, "No API connection available to fill buffer");
return;
}
ESP_LOGD(TAG, "Filling transmit buffer with %zu messages of %zu bytes each", this->fill_count_, this->fill_size_);
// Create a large text sensor state response to fill the buffer
api::TextSensorStateResponse resp;
resp.key = 0x12345678; // Dummy key
resp.state = std::string(this->fill_size_, 'X'); // Large payload
resp.missing_state = false;
// Send many messages rapidly to fill the transmit buffer
size_t sent_count = 0;
size_t failed_count = 0;
for (size_t i = 0; i < this->fill_count_; i++) {
// Modify the string slightly each time
resp.state[0] = 'A' + (i % 26);
// Send message directly without batching
bool sent = this->api_connection_->send_message(resp);
if (!sent) {
failed_count++;
ESP_LOGV(TAG, "Message %zu failed to send - buffer likely full", i);
} else {
sent_count++;
}
// Log progress
if (i % 50 == 0) {
ESP_LOGD(TAG, "Progress: %zu/%zu messages, %zu failed", i, this->fill_count_, failed_count);
}
}
ESP_LOGD(TAG, "Buffer fill complete: %zu sent, %zu failed", sent_count, failed_count);
this->last_fill_time_ = millis();
}
void APIBufferTestComponent::generate_heavy_traffic() {
ESP_LOGD(TAG, "Generating heavy traffic to fill transmit buffer");
// Generate many large log messages rapidly
// These will be sent over the API if log subscription is active
std::string large_log(this->fill_size_, 'X');
for (size_t i = 0; i < this->fill_count_; i++) {
// Modify the string to ensure each message is unique
large_log[0] = 'A' + (i % 26);
// Use VERY_VERBOSE level to ensure it's sent when subscribed
ESP_LOGVV(TAG, "Buffer fill #%zu: %s", i, large_log.c_str());
// Progress logging at higher level
if (i % 50 == 0) {
ESP_LOGD(TAG, "Traffic generation progress: %zu/%zu", i, this->fill_count_);
}
}
ESP_LOGD(TAG, "Heavy traffic generation complete");
}
void APIBufferTestComponent::generate_traffic_burst() {
// Generate a burst of medium-sized messages to keep buffer topped up
std::string medium_log(512, 'K');
for (int i = 0; i < 5; i++) {
medium_log[0] = '0' + (i % 10);
ESP_LOGVV(TAG, "Keep-full burst #%d: %s", i, medium_log.c_str());
}
}
void APIBufferTestComponent::keep_buffer_full() {
// Deprecated - use generate_traffic_burst instead
this->generate_traffic_burst();
}
} // namespace api_buffer_test_component
} // namespace esphome

View File

@ -0,0 +1,52 @@
#pragma once
#include "esphome/core/component.h"
#include "esphome/core/log.h"
#include "esphome/components/api/api_server.h"
#include "esphome/components/api/api_connection.h"
#include "esphome/components/api/api_pb2.h"
namespace esphome {
namespace api_buffer_test_component {
static const char *const TAG = "api_buffer_test";
class APIBufferTestComponent : public Component {
public:
void setup() override;
void loop() override;
float get_setup_priority() const override { return setup_priority::AFTER_CONNECTION; }
// Subscribe to API connection (like bluetooth_proxy)
void subscribe_api_connection(api::APIConnection *api_connection);
void unsubscribe_api_connection(api::APIConnection *api_connection);
// Test methods
void fill_buffer();
void keep_buffer_full();
void generate_heavy_traffic();
void generate_traffic_burst();
// Configuration
void set_fill_size(size_t size) { this->fill_size_ = size; }
void set_fill_count(size_t count) { this->fill_count_ = count; }
void set_auto_fill_delay(uint32_t delay) { this->auto_fill_delay_ = delay; }
protected:
api::APIConnection *api_connection_{nullptr};
size_t fill_size_{2048};
size_t fill_count_{200};
uint32_t auto_fill_delay_{2000};
uint32_t last_fill_time_{0};
bool buffer_filled_{false};
bool should_keep_full_{false};
uint32_t keep_full_until_{0};
bool tried_subscribe_{false};
};
extern APIBufferTestComponent
*global_api_buffer_test_component; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
} // namespace api_buffer_test_component
} // namespace esphome

View File

@ -41,14 +41,19 @@ async def test_defer_fifo_simple(
assert test_complete_entity is not None, "test_complete event not found"
assert test_result_entity is not None, "test_result event not found"
# Find our test service
run_defer_test_service: UserService | None = None
# Find our test services
test_set_timeout_service: UserService | None = None
test_defer_service: UserService | None = None
for service in services:
if service.name == "run_defer_test":
run_defer_test_service = service
break
if service.name == "test_set_timeout":
test_set_timeout_service = service
elif service.name == "test_defer":
test_defer_service = service
assert run_defer_test_service is not None, "run_defer_test service not found"
assert test_set_timeout_service is not None, (
"test_set_timeout service not found"
)
assert test_defer_service is not None, "test_defer service not found"
# Get the event loop
loop = asyncio.get_running_loop()
@ -74,15 +79,35 @@ async def test_defer_fifo_simple(
client.subscribe_states(on_state)
# Call the run_defer_test service to start the test
client.execute_service(run_defer_test_service, {})
# Test 1: Test set_timeout(0)
client.execute_service(test_set_timeout_service, {})
# Wait for test completion with timeout
# Wait for first test completion
try:
await asyncio.wait_for(test_complete_future, timeout=10.0)
test_passed = await asyncio.wait_for(test_result_future, timeout=1.0)
await asyncio.wait_for(test_complete_future, timeout=5.0)
test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0)
except asyncio.TimeoutError:
pytest.fail("Test did not complete within 10 seconds")
pytest.fail("Test set_timeout(0) did not complete within 5 seconds")
assert test1_passed is True, (
"set_timeout(0) FIFO test failed - items executed out of order"
)
# Reset futures for second test
test_complete_future = loop.create_future()
test_result_future = loop.create_future()
# Test 2: Test defer()
client.execute_service(test_defer_service, {})
# Wait for second test completion
try:
await asyncio.wait_for(test_complete_future, timeout=5.0)
test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0)
except asyncio.TimeoutError:
pytest.fail("Test defer() did not complete within 5 seconds")
# Verify the test passed
assert test_passed is True, "FIFO test failed - items executed out of order"
assert test2_passed is True, (
"defer() FIFO test failed - items executed out of order"
)

View File

@ -0,0 +1,90 @@
"""Stress test for defer() thread safety with multiple threads."""
import asyncio
from aioesphomeapi import EntityState, Event, EventInfo, UserService
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_defer_stress(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that defer() doesn't crash when called rapidly from multiple threads."""
async with run_compiled(yaml_config), api_client_connected() as client:
# Verify we can connect
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "defer-stress-test"
# List entities and services
entity_info, services = await asyncio.wait_for(
client.list_entities_services(), timeout=5.0
)
# Find our test entities
test_complete_entity: EventInfo | None = None
test_result_entity: EventInfo | None = None
for entity in entity_info:
if isinstance(entity, EventInfo):
if entity.object_id == "test_complete":
test_complete_entity = entity
elif entity.object_id == "test_result":
test_result_entity = entity
assert test_complete_entity is not None, "test_complete event not found"
assert test_result_entity is not None, "test_result event not found"
# Find our test service
run_stress_test_service: UserService | None = None
for service in services:
if service.name == "run_stress_test":
run_stress_test_service = service
break
assert run_stress_test_service is not None, "run_stress_test service not found"
# Get the event loop
loop = asyncio.get_running_loop()
# Subscribe to states (events are delivered as EventStates through subscribe_states)
test_complete_future: asyncio.Future[bool] = loop.create_future()
test_result_future: asyncio.Future[bool] = loop.create_future()
def on_state(state: EntityState) -> None:
if isinstance(state, Event):
if state.key == test_complete_entity.key:
if (
state.event_type == "test_finished"
and not test_complete_future.done()
):
test_complete_future.set_result(True)
elif state.key == test_result_entity.key:
if not test_result_future.done():
if state.event_type == "passed":
test_result_future.set_result(True)
elif state.event_type == "failed":
test_result_future.set_result(False)
client.subscribe_states(on_state)
# Call the run_stress_test service to start the test
client.execute_service(run_stress_test_service, {})
# Wait for test completion with a longer timeout (threads run for 100ms + processing time)
try:
await asyncio.wait_for(test_complete_future, timeout=10.0)
test_passed = await asyncio.wait_for(test_result_future, timeout=1.0)
except asyncio.TimeoutError:
pytest.fail("Stress test did not complete within 10 seconds")
# Verify the test passed
assert test_passed is True, (
"Stress test failed - defer() crashed or failed under thread pressure"
)