From cd2b50c27f72671f24cf39dbc84c7baee188b639 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 4 Jul 2025 08:49:12 -0500 Subject: [PATCH] stress test --- .../fixtures/defer_fifo_simple.yaml | 116 ++++++------ tests/integration/fixtures/defer_stress.yaml | 77 ++++++++ .../api_buffer_test_component/__init__.py | 35 ++++ .../api_buffer_test_component.cpp | 166 ++++++++++++++++++ .../api_buffer_test_component.h | 52 ++++++ tests/integration/test_defer_fifo_simple.py | 51 ++++-- tests/integration/test_defer_stress.py | 90 ++++++++++ 7 files changed, 517 insertions(+), 70 deletions(-) create mode 100644 tests/integration/fixtures/defer_stress.yaml create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp create mode 100644 tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h create mode 100644 tests/integration/test_defer_stress.py diff --git a/tests/integration/fixtures/defer_fifo_simple.yaml b/tests/integration/fixtures/defer_fifo_simple.yaml index aede9a3cd0..a221256f6c 100644 --- a/tests/integration/fixtures/defer_fifo_simple.yaml +++ b/tests/integration/fixtures/defer_fifo_simple.yaml @@ -8,14 +8,18 @@ logger: api: services: - - service: run_defer_test + - service: test_set_timeout then: - lambda: |- - // Test 1: Test set_timeout with 0 delay (direct scheduler call) + // Test set_timeout with 0 delay (direct scheduler call) static int set_timeout_order = 0; static bool set_timeout_passed = true; - ESP_LOGD("defer_test", "Test 1: Testing set_timeout(0) for FIFO order..."); + // Reset for this test + set_timeout_order = 0; + set_timeout_passed = true; + + ESP_LOGD("defer_test", "Testing set_timeout(0) for FIFO order..."); for (int i = 0; i < 10; i++) { int expected = i; App.scheduler.set_timeout((Component*)nullptr, nullptr, 0, [expected]() { @@ -28,68 +32,66 @@ api: if (set_timeout_order == 10) { if (set_timeout_passed) { - ESP_LOGI("defer_test", "✓ Test 1 PASSED - set_timeout(0) maintains FIFO order"); + ESP_LOGI("defer_test", "✓ Test PASSED - set_timeout(0) maintains FIFO order"); + id(test_result)->trigger("passed"); } else { - ESP_LOGE("defer_test", "✗ Test 1 FAILED - set_timeout(0) executed out of order"); + ESP_LOGE("defer_test", "✗ Test FAILED - set_timeout(0) executed out of order"); + id(test_result)->trigger("failed"); } - - // Start Test 2 after Test 1 completes - App.scheduler.set_timeout((Component*)nullptr, nullptr, 100, []() { - // Test 2: Test defer() method (component method) - static int defer_order = 0; - static bool defer_passed = true; - - ESP_LOGD("defer_test", "Test 2: Testing defer() for FIFO order..."); - - // Create a test component class that exposes defer() - class TestComponent : public Component { - public: - void test_defer() { - for (int i = 0; i < 10; i++) { - int expected = i; - this->defer([expected]() { - ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); - if (defer_order != expected) { - ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); - defer_passed = false; - } - defer_order++; - - if (defer_order == 10) { - bool all_passed = set_timeout_passed && defer_passed; - if (defer_passed) { - ESP_LOGI("defer_test", "✓ Test 2 PASSED - defer() maintains FIFO order"); - if (all_passed) { - ESP_LOGI("defer_test", "✓ ALL TESTS PASSED - Both set_timeout(0) and defer() maintain FIFO order"); - } - } else { - ESP_LOGE("defer_test", "✗ Test 2 FAILED - defer() executed out of order"); - } - - // Fire test result events - if (all_passed) { - id(test_result)->trigger("passed"); - } else { - id(test_result)->trigger("failed"); - } - id(test_complete)->trigger("test_finished"); - } - }); - } - } - }; - - TestComponent test_component; - test_component.test_defer(); - - ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); - }); + id(test_complete)->trigger("test_finished"); } }); } ESP_LOGD("defer_test", "Deferred 10 items using set_timeout(0), waiting for execution..."); + - service: test_defer + then: + - lambda: |- + // Test defer() method (component method) + static int defer_order = 0; + static bool defer_passed = true; + + // Reset for this test + defer_order = 0; + defer_passed = true; + + ESP_LOGD("defer_test", "Testing defer() for FIFO order..."); + + // Create a test component class that exposes defer() + class TestComponent : public Component { + public: + void test_defer() { + for (int i = 0; i < 10; i++) { + int expected = i; + this->defer([expected]() { + ESP_LOGD("defer_test", "defer() item %d executed, order %d", expected, defer_order); + if (defer_order != expected) { + ESP_LOGE("defer_test", "FIFO violation in defer: expected %d but got execution order %d", expected, defer_order); + defer_passed = false; + } + defer_order++; + + if (defer_order == 10) { + if (defer_passed) { + ESP_LOGI("defer_test", "✓ Test PASSED - defer() maintains FIFO order"); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("defer_test", "✗ Test FAILED - defer() executed out of order"); + id(test_result)->trigger("failed"); + } + id(test_complete)->trigger("test_finished"); + } + }); + } + } + }; + + TestComponent test_component; + test_component.test_defer(); + + ESP_LOGD("defer_test", "Deferred 10 items using defer(), waiting for execution..."); + event: - platform: template name: "Test Complete" diff --git a/tests/integration/fixtures/defer_stress.yaml b/tests/integration/fixtures/defer_stress.yaml new file mode 100644 index 0000000000..867d40ab53 --- /dev/null +++ b/tests/integration/fixtures/defer_stress.yaml @@ -0,0 +1,77 @@ +esphome: + name: defer-stress-test + +host: + +logger: + level: DEBUG + +api: + services: + - service: run_stress_test + then: + - lambda: |- + static int total_defers = 0; + static int executed_defers = 0; + + ESP_LOGI("stress", "Starting defer stress test - rapid sequential defers"); + + // Reset counters + total_defers = 0; + executed_defers = 0; + + // Create a temporary component to access defer() + class TestComponent : public Component { + public: + void run_test() { + // Rapidly defer many callbacks to stress the defer mechanism + for (int batch = 0; batch < 10; batch++) { + for (int i = 0; i < 100; i++) { + int expected_id = total_defers; + this->defer([expected_id]() { + executed_defers++; + ESP_LOGV("stress", "Defer %d executed", expected_id); + }); + total_defers++; + } + // Brief yield to let other work happen + delay(1); + } + } + }; + + TestComponent test_comp; + test_comp.run_test(); + + ESP_LOGI("stress", "Scheduled %d defers", total_defers); + + // Give the main loop time to process all defers + App.scheduler.set_timeout((Component*)nullptr, nullptr, 500, []() { + ESP_LOGI("stress", "Test complete. Defers scheduled: %d, executed: %d", total_defers, executed_defers); + + // We should have executed all defers without crashing + if (executed_defers == total_defers && total_defers == 1000) { + ESP_LOGI("stress", "✓ Stress test PASSED - All %d defers executed", total_defers); + id(test_result)->trigger("passed"); + } else { + ESP_LOGE("stress", "✗ Stress test FAILED - Expected 1000 executed, got %d", executed_defers); + id(test_result)->trigger("failed"); + } + + id(test_complete)->trigger("test_finished"); + }); + +event: + - platform: template + name: "Test Complete" + id: test_complete + device_class: button + event_types: + - "test_finished" + - platform: template + name: "Test Result" + id: test_result + device_class: button + event_types: + - "passed" + - "failed" diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py b/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py new file mode 100644 index 0000000000..9263e1e084 --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/__init__.py @@ -0,0 +1,35 @@ +import esphome.codegen as cg +import esphome.config_validation as cv +from esphome.const import CONF_ID + +CODEOWNERS = ["@test"] +AUTO_LOAD = ["api"] + +api_buffer_test_component_ns = cg.esphome_ns.namespace("api_buffer_test_component") +APIBufferTestComponent = api_buffer_test_component_ns.class_( + "APIBufferTestComponent", cg.Component +) + +CONF_FILL_SIZE = "fill_size" +CONF_FILL_COUNT = "fill_count" +CONF_AUTO_FILL_DELAY = "auto_fill_delay" + +CONFIG_SCHEMA = cv.Schema( + { + cv.GenerateID(): cv.declare_id(APIBufferTestComponent), + cv.Optional(CONF_FILL_SIZE, default=2048): cv.int_range(min=1, max=16384), + cv.Optional(CONF_FILL_COUNT, default=200): cv.int_range(min=1, max=1000), + cv.Optional( + CONF_AUTO_FILL_DELAY, default="2s" + ): cv.positive_time_period_milliseconds, + } +).extend(cv.COMPONENT_SCHEMA) + + +async def to_code(config): + var = cg.new_Pvariable(config[CONF_ID]) + await cg.register_component(var, config) + + cg.add(var.set_fill_size(config[CONF_FILL_SIZE])) + cg.add(var.set_fill_count(config[CONF_FILL_COUNT])) + cg.add(var.set_auto_fill_delay(config[CONF_AUTO_FILL_DELAY])) diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp new file mode 100644 index 0000000000..34be504d8e --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.cpp @@ -0,0 +1,166 @@ +#include "api_buffer_test_component.h" +#include "esphome/core/application.h" + +namespace esphome { +namespace api_buffer_test_component { + +APIBufferTestComponent *global_api_buffer_test_component = + nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +void APIBufferTestComponent::setup() { + ESP_LOGD(TAG, "API Buffer Test Component setup"); + this->last_fill_time_ = millis(); + global_api_buffer_test_component = this; + + // For testing, we'll get the API connection through a hack + // In a real implementation, this would be done properly through the API + App.scheduler.set_timeout(this, "get_api_connection", 500, [this]() { + auto *api_server = api::global_api_server; + if (api_server != nullptr) { + // This is a hack - in production code, use proper API subscription + // For testing, we'll assume there's only one connection + ESP_LOGD(TAG, "Looking for API connection to subscribe to"); + } + }); +} + +void APIBufferTestComponent::loop() { + // Check if API server is ready and has connections + auto *api_server = api::global_api_server; + if (api_server == nullptr || !api_server->is_connected()) { + return; + } + + // Try to get an API connection if we don't have one + if (this->api_connection_ == nullptr && !this->tried_subscribe_) { + this->tried_subscribe_ = true; + ESP_LOGD(TAG, "API server is connected, buffer test component ready"); + // For testing, we'll work with the fact that send_message is available + // through the global API server's connection management + } + + uint32_t now = millis(); + + // Auto-fill buffer after delay if configured + if (this->auto_fill_delay_ > 0 && !this->buffer_filled_ && api_server->is_connected()) { + if (now - this->last_fill_time_ > this->auto_fill_delay_) { + ESP_LOGD(TAG, "Auto-filling buffer after %u ms delay", this->auto_fill_delay_); + // For the test, we'll generate heavy log traffic instead + this->generate_heavy_traffic(); + this->buffer_filled_ = true; + + // Keep generating traffic for 5 seconds + this->should_keep_full_ = true; + this->keep_full_until_ = now + 5000; + } + } + + // Keep buffer full if requested + if (this->should_keep_full_ && now < this->keep_full_until_) { + // Generate more traffic to keep buffer full + this->generate_traffic_burst(); + } else if (this->should_keep_full_ && now >= this->keep_full_until_) { + this->should_keep_full_ = false; + ESP_LOGD(TAG, "Stopped keeping buffer full"); + } +} + +void APIBufferTestComponent::subscribe_api_connection(api::APIConnection *api_connection) { + if (this->api_connection_ != nullptr) { + ESP_LOGE(TAG, "Already subscribed to an API connection"); + return; + } + this->api_connection_ = api_connection; + ESP_LOGD(TAG, "Subscribed to API connection"); +} + +void APIBufferTestComponent::unsubscribe_api_connection(api::APIConnection *api_connection) { + if (this->api_connection_ != api_connection) { + return; + } + this->api_connection_ = nullptr; + ESP_LOGD(TAG, "Unsubscribed from API connection"); +} + +void APIBufferTestComponent::fill_buffer() { + if (this->api_connection_ == nullptr) { + ESP_LOGW(TAG, "No API connection available to fill buffer"); + return; + } + + ESP_LOGD(TAG, "Filling transmit buffer with %zu messages of %zu bytes each", this->fill_count_, this->fill_size_); + + // Create a large text sensor state response to fill the buffer + api::TextSensorStateResponse resp; + resp.key = 0x12345678; // Dummy key + resp.state = std::string(this->fill_size_, 'X'); // Large payload + resp.missing_state = false; + + // Send many messages rapidly to fill the transmit buffer + size_t sent_count = 0; + size_t failed_count = 0; + + for (size_t i = 0; i < this->fill_count_; i++) { + // Modify the string slightly each time + resp.state[0] = 'A' + (i % 26); + + // Send message directly without batching + bool sent = this->api_connection_->send_message(resp); + + if (!sent) { + failed_count++; + ESP_LOGV(TAG, "Message %zu failed to send - buffer likely full", i); + } else { + sent_count++; + } + + // Log progress + if (i % 50 == 0) { + ESP_LOGD(TAG, "Progress: %zu/%zu messages, %zu failed", i, this->fill_count_, failed_count); + } + } + + ESP_LOGD(TAG, "Buffer fill complete: %zu sent, %zu failed", sent_count, failed_count); + this->last_fill_time_ = millis(); +} + +void APIBufferTestComponent::generate_heavy_traffic() { + ESP_LOGD(TAG, "Generating heavy traffic to fill transmit buffer"); + + // Generate many large log messages rapidly + // These will be sent over the API if log subscription is active + std::string large_log(this->fill_size_, 'X'); + + for (size_t i = 0; i < this->fill_count_; i++) { + // Modify the string to ensure each message is unique + large_log[0] = 'A' + (i % 26); + + // Use VERY_VERBOSE level to ensure it's sent when subscribed + ESP_LOGVV(TAG, "Buffer fill #%zu: %s", i, large_log.c_str()); + + // Progress logging at higher level + if (i % 50 == 0) { + ESP_LOGD(TAG, "Traffic generation progress: %zu/%zu", i, this->fill_count_); + } + } + + ESP_LOGD(TAG, "Heavy traffic generation complete"); +} + +void APIBufferTestComponent::generate_traffic_burst() { + // Generate a burst of medium-sized messages to keep buffer topped up + std::string medium_log(512, 'K'); + + for (int i = 0; i < 5; i++) { + medium_log[0] = '0' + (i % 10); + ESP_LOGVV(TAG, "Keep-full burst #%d: %s", i, medium_log.c_str()); + } +} + +void APIBufferTestComponent::keep_buffer_full() { + // Deprecated - use generate_traffic_burst instead + this->generate_traffic_burst(); +} + +} // namespace api_buffer_test_component +} // namespace esphome \ No newline at end of file diff --git a/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h new file mode 100644 index 0000000000..122f01b6c9 --- /dev/null +++ b/tests/integration/fixtures/external_components/api_buffer_test_component/api_buffer_test_component.h @@ -0,0 +1,52 @@ +#pragma once + +#include "esphome/core/component.h" +#include "esphome/core/log.h" +#include "esphome/components/api/api_server.h" +#include "esphome/components/api/api_connection.h" +#include "esphome/components/api/api_pb2.h" + +namespace esphome { +namespace api_buffer_test_component { + +static const char *const TAG = "api_buffer_test"; + +class APIBufferTestComponent : public Component { + public: + void setup() override; + void loop() override; + + float get_setup_priority() const override { return setup_priority::AFTER_CONNECTION; } + + // Subscribe to API connection (like bluetooth_proxy) + void subscribe_api_connection(api::APIConnection *api_connection); + void unsubscribe_api_connection(api::APIConnection *api_connection); + + // Test methods + void fill_buffer(); + void keep_buffer_full(); + void generate_heavy_traffic(); + void generate_traffic_burst(); + + // Configuration + void set_fill_size(size_t size) { this->fill_size_ = size; } + void set_fill_count(size_t count) { this->fill_count_ = count; } + void set_auto_fill_delay(uint32_t delay) { this->auto_fill_delay_ = delay; } + + protected: + api::APIConnection *api_connection_{nullptr}; + size_t fill_size_{2048}; + size_t fill_count_{200}; + uint32_t auto_fill_delay_{2000}; + uint32_t last_fill_time_{0}; + bool buffer_filled_{false}; + bool should_keep_full_{false}; + uint32_t keep_full_until_{0}; + bool tried_subscribe_{false}; +}; + +extern APIBufferTestComponent + *global_api_buffer_test_component; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +} // namespace api_buffer_test_component +} // namespace esphome \ No newline at end of file diff --git a/tests/integration/test_defer_fifo_simple.py b/tests/integration/test_defer_fifo_simple.py index 46d68db171..5bfe02329f 100644 --- a/tests/integration/test_defer_fifo_simple.py +++ b/tests/integration/test_defer_fifo_simple.py @@ -41,14 +41,19 @@ async def test_defer_fifo_simple( assert test_complete_entity is not None, "test_complete event not found" assert test_result_entity is not None, "test_result event not found" - # Find our test service - run_defer_test_service: UserService | None = None + # Find our test services + test_set_timeout_service: UserService | None = None + test_defer_service: UserService | None = None for service in services: - if service.name == "run_defer_test": - run_defer_test_service = service - break + if service.name == "test_set_timeout": + test_set_timeout_service = service + elif service.name == "test_defer": + test_defer_service = service - assert run_defer_test_service is not None, "run_defer_test service not found" + assert test_set_timeout_service is not None, ( + "test_set_timeout service not found" + ) + assert test_defer_service is not None, "test_defer service not found" # Get the event loop loop = asyncio.get_running_loop() @@ -74,15 +79,35 @@ async def test_defer_fifo_simple( client.subscribe_states(on_state) - # Call the run_defer_test service to start the test - client.execute_service(run_defer_test_service, {}) + # Test 1: Test set_timeout(0) + client.execute_service(test_set_timeout_service, {}) - # Wait for test completion with timeout + # Wait for first test completion try: - await asyncio.wait_for(test_complete_future, timeout=10.0) - test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + await asyncio.wait_for(test_complete_future, timeout=5.0) + test1_passed = await asyncio.wait_for(test_result_future, timeout=1.0) except asyncio.TimeoutError: - pytest.fail("Test did not complete within 10 seconds") + pytest.fail("Test set_timeout(0) did not complete within 5 seconds") + + assert test1_passed is True, ( + "set_timeout(0) FIFO test failed - items executed out of order" + ) + + # Reset futures for second test + test_complete_future = loop.create_future() + test_result_future = loop.create_future() + + # Test 2: Test defer() + client.execute_service(test_defer_service, {}) + + # Wait for second test completion + try: + await asyncio.wait_for(test_complete_future, timeout=5.0) + test2_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Test defer() did not complete within 5 seconds") # Verify the test passed - assert test_passed is True, "FIFO test failed - items executed out of order" + assert test2_passed is True, ( + "defer() FIFO test failed - items executed out of order" + ) diff --git a/tests/integration/test_defer_stress.py b/tests/integration/test_defer_stress.py new file mode 100644 index 0000000000..e9d6c48664 --- /dev/null +++ b/tests/integration/test_defer_stress.py @@ -0,0 +1,90 @@ +"""Stress test for defer() thread safety with multiple threads.""" + +import asyncio + +from aioesphomeapi import EntityState, Event, EventInfo, UserService +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +@pytest.mark.asyncio +async def test_defer_stress( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that defer() doesn't crash when called rapidly from multiple threads.""" + + async with run_compiled(yaml_config), api_client_connected() as client: + # Verify we can connect + device_info = await client.device_info() + assert device_info is not None + assert device_info.name == "defer-stress-test" + + # List entities and services + entity_info, services = await asyncio.wait_for( + client.list_entities_services(), timeout=5.0 + ) + + # Find our test entities + test_complete_entity: EventInfo | None = None + test_result_entity: EventInfo | None = None + + for entity in entity_info: + if isinstance(entity, EventInfo): + if entity.object_id == "test_complete": + test_complete_entity = entity + elif entity.object_id == "test_result": + test_result_entity = entity + + assert test_complete_entity is not None, "test_complete event not found" + assert test_result_entity is not None, "test_result event not found" + + # Find our test service + run_stress_test_service: UserService | None = None + for service in services: + if service.name == "run_stress_test": + run_stress_test_service = service + break + + assert run_stress_test_service is not None, "run_stress_test service not found" + + # Get the event loop + loop = asyncio.get_running_loop() + + # Subscribe to states (events are delivered as EventStates through subscribe_states) + test_complete_future: asyncio.Future[bool] = loop.create_future() + test_result_future: asyncio.Future[bool] = loop.create_future() + + def on_state(state: EntityState) -> None: + if isinstance(state, Event): + if state.key == test_complete_entity.key: + if ( + state.event_type == "test_finished" + and not test_complete_future.done() + ): + test_complete_future.set_result(True) + elif state.key == test_result_entity.key: + if not test_result_future.done(): + if state.event_type == "passed": + test_result_future.set_result(True) + elif state.event_type == "failed": + test_result_future.set_result(False) + + client.subscribe_states(on_state) + + # Call the run_stress_test service to start the test + client.execute_service(run_stress_test_service, {}) + + # Wait for test completion with a longer timeout (threads run for 100ms + processing time) + try: + await asyncio.wait_for(test_complete_future, timeout=10.0) + test_passed = await asyncio.wait_for(test_result_future, timeout=1.0) + except asyncio.TimeoutError: + pytest.fail("Stress test did not complete within 10 seconds") + + # Verify the test passed + assert test_passed is True, ( + "Stress test failed - defer() crashed or failed under thread pressure" + )