mirror of
https://github.com/esphome/esphome.git
synced 2025-08-05 18:07:47 +00:00
Merge branch 'heap_scheduler_stress_component' into integration
This commit is contained in:
commit
8da8d938f0
@ -50,7 +50,8 @@ void HttpRequestUpdate::update_task(void *params) {
|
|||||||
|
|
||||||
if (container == nullptr || container->status_code != HTTP_STATUS_OK) {
|
if (container == nullptr || container->status_code != HTTP_STATUS_OK) {
|
||||||
std::string msg = str_sprintf("Failed to fetch manifest from %s", this_update->source_url_.c_str());
|
std::string msg = str_sprintf("Failed to fetch manifest from %s", this_update->source_url_.c_str());
|
||||||
this_update->status_set_error(msg.c_str());
|
// Defer to main loop to avoid race condition on component_state_ read-modify-write
|
||||||
|
this_update->defer([this_update, msg]() { this_update->status_set_error(msg.c_str()); });
|
||||||
UPDATE_RETURN;
|
UPDATE_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +59,8 @@ void HttpRequestUpdate::update_task(void *params) {
|
|||||||
uint8_t *data = allocator.allocate(container->content_length);
|
uint8_t *data = allocator.allocate(container->content_length);
|
||||||
if (data == nullptr) {
|
if (data == nullptr) {
|
||||||
std::string msg = str_sprintf("Failed to allocate %zu bytes for manifest", container->content_length);
|
std::string msg = str_sprintf("Failed to allocate %zu bytes for manifest", container->content_length);
|
||||||
this_update->status_set_error(msg.c_str());
|
// Defer to main loop to avoid race condition on component_state_ read-modify-write
|
||||||
|
this_update->defer([this_update, msg]() { this_update->status_set_error(msg.c_str()); });
|
||||||
container->end();
|
container->end();
|
||||||
UPDATE_RETURN;
|
UPDATE_RETURN;
|
||||||
}
|
}
|
||||||
@ -120,7 +122,8 @@ void HttpRequestUpdate::update_task(void *params) {
|
|||||||
|
|
||||||
if (!valid) {
|
if (!valid) {
|
||||||
std::string msg = str_sprintf("Failed to parse JSON from %s", this_update->source_url_.c_str());
|
std::string msg = str_sprintf("Failed to parse JSON from %s", this_update->source_url_.c_str());
|
||||||
this_update->status_set_error(msg.c_str());
|
// Defer to main loop to avoid race condition on component_state_ read-modify-write
|
||||||
|
this_update->defer([this_update, msg]() { this_update->status_set_error(msg.c_str()); });
|
||||||
UPDATE_RETURN;
|
UPDATE_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,18 +150,34 @@ void HttpRequestUpdate::update_task(void *params) {
|
|||||||
this_update->update_info_.current_version = current_version;
|
this_update->update_info_.current_version = current_version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool trigger_update_available = false;
|
||||||
|
|
||||||
if (this_update->update_info_.latest_version.empty() ||
|
if (this_update->update_info_.latest_version.empty() ||
|
||||||
this_update->update_info_.latest_version == this_update->update_info_.current_version) {
|
this_update->update_info_.latest_version == this_update->update_info_.current_version) {
|
||||||
this_update->state_ = update::UPDATE_STATE_NO_UPDATE;
|
this_update->state_ = update::UPDATE_STATE_NO_UPDATE;
|
||||||
} else {
|
} else {
|
||||||
|
if (this_update->state_ != update::UPDATE_STATE_AVAILABLE) {
|
||||||
|
trigger_update_available = true;
|
||||||
|
}
|
||||||
this_update->state_ = update::UPDATE_STATE_AVAILABLE;
|
this_update->state_ = update::UPDATE_STATE_AVAILABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
this_update->update_info_.has_progress = false;
|
// Defer to main loop to ensure thread-safe execution of:
|
||||||
this_update->update_info_.progress = 0.0f;
|
// - status_clear_error() performs non-atomic read-modify-write on component_state_
|
||||||
|
// - publish_state() triggers API callbacks that write to the shared protobuf buffer
|
||||||
|
// which can be corrupted if accessed concurrently from task and main loop threads
|
||||||
|
// - update_available trigger to ensure consistent state when the trigger fires
|
||||||
|
this_update->defer([this_update, trigger_update_available]() {
|
||||||
|
this_update->update_info_.has_progress = false;
|
||||||
|
this_update->update_info_.progress = 0.0f;
|
||||||
|
|
||||||
this_update->status_clear_error();
|
this_update->status_clear_error();
|
||||||
this_update->publish_state();
|
this_update->publish_state();
|
||||||
|
|
||||||
|
if (trigger_update_available) {
|
||||||
|
this_update->get_update_available_trigger()->trigger(this_update->update_info_);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
UPDATE_RETURN;
|
UPDATE_RETURN;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
#include "esphome/core/automation.h"
|
#include "esphome/core/automation.h"
|
||||||
#include "esphome/core/component.h"
|
#include "esphome/core/component.h"
|
||||||
#include "esphome/core/entity_base.h"
|
#include "esphome/core/entity_base.h"
|
||||||
@ -38,12 +39,19 @@ class UpdateEntity : public EntityBase, public EntityBase_DeviceClass {
|
|||||||
const UpdateState &state = state_;
|
const UpdateState &state = state_;
|
||||||
|
|
||||||
void add_on_state_callback(std::function<void()> &&callback) { this->state_callback_.add(std::move(callback)); }
|
void add_on_state_callback(std::function<void()> &&callback) { this->state_callback_.add(std::move(callback)); }
|
||||||
|
Trigger<const UpdateInfo &> *get_update_available_trigger() {
|
||||||
|
if (!update_available_trigger_) {
|
||||||
|
update_available_trigger_ = std::make_unique<Trigger<const UpdateInfo &>>();
|
||||||
|
}
|
||||||
|
return update_available_trigger_.get();
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
UpdateState state_{UPDATE_STATE_UNKNOWN};
|
UpdateState state_{UPDATE_STATE_UNKNOWN};
|
||||||
UpdateInfo update_info_;
|
UpdateInfo update_info_;
|
||||||
|
|
||||||
CallbackManager<void()> state_callback_{};
|
CallbackManager<void()> state_callback_{};
|
||||||
|
std::unique_ptr<Trigger<const UpdateInfo &>> update_available_trigger_{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace update
|
} // namespace update
|
||||||
|
@ -220,6 +220,9 @@ bool HOT Scheduler::cancel_retry(Component *component, const std::string &name)
|
|||||||
}
|
}
|
||||||
|
|
||||||
optional<uint32_t> HOT Scheduler::next_schedule_in() {
|
optional<uint32_t> HOT Scheduler::next_schedule_in() {
|
||||||
|
// IMPORTANT: This method should only be called from the main thread (loop task).
|
||||||
|
// It calls empty_() and accesses items_[0] without holding a lock, which is only
|
||||||
|
// safe when called from the main thread. Other threads must not call this method.
|
||||||
if (this->empty_())
|
if (this->empty_())
|
||||||
return {};
|
return {};
|
||||||
auto &item = this->items_[0];
|
auto &item = this->items_[0];
|
||||||
@ -291,29 +294,27 @@ void HOT Scheduler::call() {
|
|||||||
}
|
}
|
||||||
#endif // ESPHOME_DEBUG_SCHEDULER
|
#endif // ESPHOME_DEBUG_SCHEDULER
|
||||||
|
|
||||||
auto to_remove_was = this->to_remove_;
|
|
||||||
auto items_was = this->items_.size();
|
|
||||||
// If we have too many items to remove
|
// If we have too many items to remove
|
||||||
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
|
if (this->to_remove_ > MAX_LOGICALLY_DELETED_ITEMS) {
|
||||||
|
// We hold the lock for the entire cleanup operation because:
|
||||||
|
// 1. We're rebuilding the entire items_ list, so we need exclusive access throughout
|
||||||
|
// 2. Other threads must see either the old state or the new state, not intermediate states
|
||||||
|
// 3. The operation is already expensive (O(n)), so lock overhead is negligible
|
||||||
|
// 4. No operations inside can block or take other locks, so no deadlock risk
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
|
|
||||||
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
std::vector<std::unique_ptr<SchedulerItem>> valid_items;
|
||||||
while (!this->empty_()) {
|
|
||||||
LockGuard guard{this->lock_};
|
// Move all non-removed items to valid_items
|
||||||
auto item = std::move(this->items_[0]);
|
for (auto &item : this->items_) {
|
||||||
this->pop_raw_();
|
if (!item->remove) {
|
||||||
valid_items.push_back(std::move(item));
|
valid_items.push_back(std::move(item));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
// Replace items_ with the filtered list
|
||||||
LockGuard guard{this->lock_};
|
this->items_ = std::move(valid_items);
|
||||||
this->items_ = std::move(valid_items);
|
this->to_remove_ = 0;
|
||||||
}
|
|
||||||
|
|
||||||
// The following should not happen unless I'm missing something
|
|
||||||
if (this->to_remove_ != 0) {
|
|
||||||
ESP_LOGW(TAG, "to_remove_ was %" PRIu32 " now: %" PRIu32 " items where %zu now %zu. Please report this",
|
|
||||||
to_remove_was, to_remove_, items_was, items_.size());
|
|
||||||
this->to_remove_ = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!this->empty_()) {
|
while (!this->empty_()) {
|
||||||
@ -383,17 +384,29 @@ void HOT Scheduler::process_to_add() {
|
|||||||
this->to_add_.clear();
|
this->to_add_.clear();
|
||||||
}
|
}
|
||||||
void HOT Scheduler::cleanup_() {
|
void HOT Scheduler::cleanup_() {
|
||||||
|
// Fast path: if nothing to remove, just return
|
||||||
|
// Reading to_remove_ without lock is safe because:
|
||||||
|
// 1. We only call this from the main thread during call()
|
||||||
|
// 2. If it's 0, there's definitely nothing to cleanup
|
||||||
|
// 3. If it becomes non-zero after we check, cleanup will happen next time
|
||||||
|
if (this->to_remove_ == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// We must hold the lock for the entire cleanup operation because:
|
||||||
|
// 1. We're modifying items_ (via pop_raw_) which requires exclusive access
|
||||||
|
// 2. We're decrementing to_remove_ which is also modified by other threads
|
||||||
|
// (though all modifications are already under lock)
|
||||||
|
// 3. Other threads read items_ when searching for items to cancel in cancel_item_locked_()
|
||||||
|
// 4. We need a consistent view of items_ and to_remove_ throughout the operation
|
||||||
|
// Without the lock, we could access items_ while another thread is reading it,
|
||||||
|
// leading to race conditions
|
||||||
|
LockGuard guard{this->lock_};
|
||||||
while (!this->items_.empty()) {
|
while (!this->items_.empty()) {
|
||||||
auto &item = this->items_[0];
|
auto &item = this->items_[0];
|
||||||
if (!item->remove)
|
if (!item->remove)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
this->to_remove_--;
|
this->to_remove_--;
|
||||||
|
this->pop_raw_();
|
||||||
{
|
|
||||||
LockGuard guard{this->lock_};
|
|
||||||
this->pop_raw_();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void HOT Scheduler::pop_raw_() {
|
void HOT Scheduler::pop_raw_() {
|
||||||
|
@ -99,9 +99,15 @@ class Scheduler {
|
|||||||
SchedulerItem(const SchedulerItem &) = delete;
|
SchedulerItem(const SchedulerItem &) = delete;
|
||||||
SchedulerItem &operator=(const SchedulerItem &) = delete;
|
SchedulerItem &operator=(const SchedulerItem &) = delete;
|
||||||
|
|
||||||
// Default move operations
|
// Delete move operations to prevent accidental moves of SchedulerItem objects.
|
||||||
SchedulerItem(SchedulerItem &&) = default;
|
// This is intentional because:
|
||||||
SchedulerItem &operator=(SchedulerItem &&) = default;
|
// 1. SchedulerItem contains a dynamically allocated name that requires careful ownership management
|
||||||
|
// 2. The scheduler only moves unique_ptr<SchedulerItem>, never SchedulerItem objects directly
|
||||||
|
// 3. Moving unique_ptr only transfers pointer ownership without moving the pointed-to object
|
||||||
|
// 4. Deleting these operations makes it explicit that SchedulerItem objects should not be moved
|
||||||
|
// 5. This prevents potential double-free bugs if the code is refactored to move SchedulerItem objects
|
||||||
|
SchedulerItem(SchedulerItem &&) = delete;
|
||||||
|
SchedulerItem &operator=(SchedulerItem &&) = delete;
|
||||||
|
|
||||||
// Helper to get the name regardless of storage type
|
// Helper to get the name regardless of storage type
|
||||||
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
const char *get_name() const { return name_is_dynamic ? name_.dynamic_name : name_.static_name; }
|
||||||
@ -179,6 +185,12 @@ class Scheduler {
|
|||||||
return item->remove || (item->component != nullptr && item->component->is_failed());
|
return item->remove || (item->component != nullptr && item->component->is_failed());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the scheduler has no items.
|
||||||
|
// IMPORTANT: This method should only be called from the main thread (loop task).
|
||||||
|
// It performs cleanup of removed items and checks if the queue is empty.
|
||||||
|
// The items_.empty() check at the end is done without a lock for performance,
|
||||||
|
// which is safe because this is only called from the main thread while other
|
||||||
|
// threads only add items (never remove them).
|
||||||
bool empty_() {
|
bool empty_() {
|
||||||
this->cleanup_();
|
this->cleanup_();
|
||||||
return this->items_.empty();
|
return this->items_.empty();
|
||||||
|
@ -91,3 +91,5 @@ update:
|
|||||||
name: OTA Update
|
name: OTA Update
|
||||||
id: ota_update
|
id: ota_update
|
||||||
source: http://my.ha.net:8123/local/esphome/manifest.json
|
source: http://my.ha.net:8123/local/esphome/manifest.json
|
||||||
|
on_update_available:
|
||||||
|
- logger.log: "A new update is available"
|
||||||
|
@ -26,3 +26,5 @@ update:
|
|||||||
- platform: http_request
|
- platform: http_request
|
||||||
name: Firmware Update
|
name: Firmware Update
|
||||||
source: http://example.com/manifest.json
|
source: http://example.com/manifest.json
|
||||||
|
on_update_available:
|
||||||
|
- logger.log: "A new update is available"
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
import esphome.codegen as cg
|
||||||
|
import esphome.config_validation as cv
|
||||||
|
from esphome.const import CONF_ID
|
||||||
|
|
||||||
|
scheduler_bulk_cleanup_component_ns = cg.esphome_ns.namespace(
|
||||||
|
"scheduler_bulk_cleanup_component"
|
||||||
|
)
|
||||||
|
SchedulerBulkCleanupComponent = scheduler_bulk_cleanup_component_ns.class_(
|
||||||
|
"SchedulerBulkCleanupComponent", cg.Component
|
||||||
|
)
|
||||||
|
|
||||||
|
CONFIG_SCHEMA = cv.Schema(
|
||||||
|
{
|
||||||
|
cv.GenerateID(): cv.declare_id(SchedulerBulkCleanupComponent),
|
||||||
|
}
|
||||||
|
).extend(cv.COMPONENT_SCHEMA)
|
||||||
|
|
||||||
|
|
||||||
|
async def to_code(config):
|
||||||
|
var = cg.new_Pvariable(config[CONF_ID])
|
||||||
|
await cg.register_component(var, config)
|
@ -0,0 +1,63 @@
|
|||||||
|
#include "scheduler_bulk_cleanup_component.h"
|
||||||
|
#include "esphome/core/log.h"
|
||||||
|
#include "esphome/core/helpers.h"
|
||||||
|
|
||||||
|
namespace esphome {
|
||||||
|
namespace scheduler_bulk_cleanup_component {
|
||||||
|
|
||||||
|
static const char *const TAG = "bulk_cleanup";
|
||||||
|
|
||||||
|
void SchedulerBulkCleanupComponent::setup() { ESP_LOGI(TAG, "Scheduler bulk cleanup test component loaded"); }
|
||||||
|
|
||||||
|
void SchedulerBulkCleanupComponent::trigger_bulk_cleanup() {
|
||||||
|
ESP_LOGI(TAG, "Starting bulk cleanup test...");
|
||||||
|
|
||||||
|
// Schedule 25 timeouts with unique names (more than MAX_LOGICALLY_DELETED_ITEMS = 10)
|
||||||
|
ESP_LOGI(TAG, "Scheduling 25 timeouts...");
|
||||||
|
for (int i = 0; i < 25; i++) {
|
||||||
|
std::string name = "bulk_timeout_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(this, name, 2500, [i]() {
|
||||||
|
// These should never execute as we'll cancel them
|
||||||
|
ESP_LOGW(TAG, "Timeout %d executed - this should not happen!", i);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel all of them to mark for removal
|
||||||
|
ESP_LOGI(TAG, "Cancelling all 25 timeouts to trigger bulk cleanup...");
|
||||||
|
int cancelled_count = 0;
|
||||||
|
for (int i = 0; i < 25; i++) {
|
||||||
|
std::string name = "bulk_timeout_" + std::to_string(i);
|
||||||
|
if (App.scheduler.cancel_timeout(this, name)) {
|
||||||
|
cancelled_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ESP_LOGI(TAG, "Successfully cancelled %d timeouts", cancelled_count);
|
||||||
|
|
||||||
|
// At this point we have 25 items marked for removal
|
||||||
|
// The next scheduler.call() should trigger the bulk cleanup path
|
||||||
|
|
||||||
|
// Schedule an interval that will execute multiple times to ensure cleanup happens
|
||||||
|
static int cleanup_check_count = 0;
|
||||||
|
App.scheduler.set_interval(this, "cleanup_checker", 25, [this]() {
|
||||||
|
cleanup_check_count++;
|
||||||
|
ESP_LOGI(TAG, "Cleanup check %d - scheduler still running", cleanup_check_count);
|
||||||
|
|
||||||
|
if (cleanup_check_count >= 5) {
|
||||||
|
// Cancel the interval and complete the test
|
||||||
|
App.scheduler.cancel_interval(this, "cleanup_checker");
|
||||||
|
ESP_LOGI(TAG, "Bulk cleanup triggered: removed %d items", 25);
|
||||||
|
ESP_LOGI(TAG, "Items before cleanup: 25+, after: <unknown>");
|
||||||
|
ESP_LOGI(TAG, "Bulk cleanup test complete");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Also schedule some normal timeouts to ensure scheduler keeps working after cleanup
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
std::string name = "post_cleanup_" + std::to_string(i);
|
||||||
|
App.scheduler.set_timeout(this, name, 50 + i * 25,
|
||||||
|
[i]() { ESP_LOGI(TAG, "Post-cleanup timeout %d executed correctly", i); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace scheduler_bulk_cleanup_component
|
||||||
|
} // namespace esphome
|
@ -0,0 +1,18 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "esphome/core/component.h"
|
||||||
|
#include "esphome/core/application.h"
|
||||||
|
|
||||||
|
namespace esphome {
|
||||||
|
namespace scheduler_bulk_cleanup_component {
|
||||||
|
|
||||||
|
class SchedulerBulkCleanupComponent : public Component {
|
||||||
|
public:
|
||||||
|
void setup() override;
|
||||||
|
float get_setup_priority() const override { return setup_priority::LATE; }
|
||||||
|
|
||||||
|
void trigger_bulk_cleanup();
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace scheduler_bulk_cleanup_component
|
||||||
|
} // namespace esphome
|
@ -70,6 +70,9 @@ void SchedulerRapidCancellationComponent::run_rapid_cancellation_test() {
|
|||||||
ESP_LOGI(TAG, " Implicit cancellations (replaced): %d", implicit_cancellations);
|
ESP_LOGI(TAG, " Implicit cancellations (replaced): %d", implicit_cancellations);
|
||||||
ESP_LOGI(TAG, " Total accounted: %d (executed + implicit cancellations)",
|
ESP_LOGI(TAG, " Total accounted: %d (executed + implicit cancellations)",
|
||||||
this->total_executed_.load() + implicit_cancellations);
|
this->total_executed_.load() + implicit_cancellations);
|
||||||
|
|
||||||
|
// Final message to signal test completion - ensures all stats are logged before test ends
|
||||||
|
ESP_LOGI(TAG, "Test finished - all statistics reported");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
23
tests/integration/fixtures/scheduler_bulk_cleanup.yaml
Normal file
23
tests/integration/fixtures/scheduler_bulk_cleanup.yaml
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
esphome:
|
||||||
|
name: scheduler-bulk-cleanup
|
||||||
|
|
||||||
|
external_components:
|
||||||
|
- source:
|
||||||
|
type: local
|
||||||
|
path: EXTERNAL_COMPONENT_PATH
|
||||||
|
|
||||||
|
host:
|
||||||
|
|
||||||
|
logger:
|
||||||
|
level: DEBUG
|
||||||
|
|
||||||
|
api:
|
||||||
|
services:
|
||||||
|
- service: trigger_bulk_cleanup
|
||||||
|
then:
|
||||||
|
- lambda: |-
|
||||||
|
auto component = id(bulk_cleanup_component);
|
||||||
|
component->trigger_bulk_cleanup();
|
||||||
|
|
||||||
|
scheduler_bulk_cleanup_component:
|
||||||
|
id: bulk_cleanup_component
|
123
tests/integration/test_scheduler_bulk_cleanup.py
Normal file
123
tests/integration/test_scheduler_bulk_cleanup.py
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
"""Test that triggers the bulk cleanup path when to_remove_ > MAX_LOGICALLY_DELETED_ITEMS."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
|
||||||
|
from aioesphomeapi import UserService
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_scheduler_bulk_cleanup(
|
||||||
|
yaml_config: str,
|
||||||
|
run_compiled: RunCompiledFunction,
|
||||||
|
api_client_connected: APIClientConnectedFactory,
|
||||||
|
) -> None:
|
||||||
|
"""Test that bulk cleanup path is triggered when many items are cancelled."""
|
||||||
|
|
||||||
|
# Get the absolute path to the external components directory
|
||||||
|
external_components_path = str(
|
||||||
|
Path(__file__).parent / "fixtures" / "external_components"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Replace the placeholder in the YAML config with the actual path
|
||||||
|
yaml_config = yaml_config.replace(
|
||||||
|
"EXTERNAL_COMPONENT_PATH", external_components_path
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a future to signal test completion
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
test_complete_future: asyncio.Future[None] = loop.create_future()
|
||||||
|
bulk_cleanup_triggered = False
|
||||||
|
cleanup_stats: dict[str, int] = {
|
||||||
|
"removed": 0,
|
||||||
|
"before": 0,
|
||||||
|
"after": 0,
|
||||||
|
}
|
||||||
|
post_cleanup_executed = 0
|
||||||
|
|
||||||
|
def on_log_line(line: str) -> None:
|
||||||
|
nonlocal bulk_cleanup_triggered, post_cleanup_executed
|
||||||
|
|
||||||
|
# Look for logs indicating bulk cleanup was triggered
|
||||||
|
# The actual cleanup happens silently, so we track the cancel operations
|
||||||
|
if "Successfully cancelled" in line and "timeouts" in line:
|
||||||
|
match = re.search(r"Successfully cancelled (\d+) timeouts", line)
|
||||||
|
if match and int(match.group(1)) > 10:
|
||||||
|
bulk_cleanup_triggered = True
|
||||||
|
|
||||||
|
# Track cleanup statistics
|
||||||
|
match = re.search(r"Bulk cleanup triggered: removed (\d+) items", line)
|
||||||
|
if match:
|
||||||
|
cleanup_stats["removed"] = int(match.group(1))
|
||||||
|
|
||||||
|
match = re.search(r"Items before cleanup: (\d+), after: (\d+)", line)
|
||||||
|
if match:
|
||||||
|
cleanup_stats["before"] = int(match.group(1))
|
||||||
|
cleanup_stats["after"] = int(match.group(2))
|
||||||
|
|
||||||
|
# Track post-cleanup timeout executions
|
||||||
|
if "Post-cleanup timeout" in line and "executed correctly" in line:
|
||||||
|
match = re.search(r"Post-cleanup timeout (\d+) executed correctly", line)
|
||||||
|
if match:
|
||||||
|
post_cleanup_executed += 1
|
||||||
|
# All 5 post-cleanup timeouts have executed
|
||||||
|
if post_cleanup_executed >= 5 and not test_complete_future.done():
|
||||||
|
test_complete_future.set_result(None)
|
||||||
|
|
||||||
|
# Check for bulk cleanup completion (but don't end test yet)
|
||||||
|
if "Bulk cleanup test complete" in line:
|
||||||
|
# This just means the interval finished, not that all timeouts executed
|
||||||
|
pass
|
||||||
|
|
||||||
|
async with (
|
||||||
|
run_compiled(yaml_config, line_callback=on_log_line),
|
||||||
|
api_client_connected() as client,
|
||||||
|
):
|
||||||
|
# Verify we can connect
|
||||||
|
device_info = await client.device_info()
|
||||||
|
assert device_info is not None
|
||||||
|
assert device_info.name == "scheduler-bulk-cleanup"
|
||||||
|
|
||||||
|
# List entities and services
|
||||||
|
_, services = await asyncio.wait_for(
|
||||||
|
client.list_entities_services(), timeout=5.0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find our test service
|
||||||
|
trigger_bulk_cleanup_service: UserService | None = None
|
||||||
|
for service in services:
|
||||||
|
if service.name == "trigger_bulk_cleanup":
|
||||||
|
trigger_bulk_cleanup_service = service
|
||||||
|
break
|
||||||
|
|
||||||
|
assert trigger_bulk_cleanup_service is not None, (
|
||||||
|
"trigger_bulk_cleanup service not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute the test
|
||||||
|
client.execute_service(trigger_bulk_cleanup_service, {})
|
||||||
|
|
||||||
|
# Wait for test completion
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(test_complete_future, timeout=10.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pytest.fail("Bulk cleanup test timed out")
|
||||||
|
|
||||||
|
# Verify bulk cleanup was triggered
|
||||||
|
assert bulk_cleanup_triggered, (
|
||||||
|
"Bulk cleanup path was not triggered - MAX_LOGICALLY_DELETED_ITEMS threshold not reached"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify cleanup statistics
|
||||||
|
assert cleanup_stats["removed"] > 10, (
|
||||||
|
f"Expected more than 10 items removed, got {cleanup_stats['removed']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify scheduler still works after bulk cleanup
|
||||||
|
assert post_cleanup_executed == 5, (
|
||||||
|
f"Expected 5 post-cleanup timeouts to execute, but {post_cleanup_executed} executed"
|
||||||
|
)
|
@ -74,9 +74,9 @@ async def test_scheduler_rapid_cancellation(
|
|||||||
test_complete_future.set_exception(Exception(f"Crash detected: {line}"))
|
test_complete_future.set_exception(Exception(f"Crash detected: {line}"))
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check for completion
|
# Check for completion - wait for final message after all stats are logged
|
||||||
if (
|
if (
|
||||||
"Rapid cancellation test complete" in line
|
"Test finished - all statistics reported" in line
|
||||||
and not test_complete_future.done()
|
and not test_complete_future.done()
|
||||||
):
|
):
|
||||||
test_complete_future.set_result(None)
|
test_complete_future.set_result(None)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user