Optimize API connection loop performance (#9184)

This commit is contained in:
J. Nick Koston 2025-06-26 03:47:41 +02:00 committed by GitHub
parent e017250445
commit 15ef93ccc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 146 additions and 95 deletions

View File

@ -33,9 +33,14 @@ namespace api {
// Since each message could contain multiple protobuf messages when using packet batching,
// this limits the number of messages processed, not the number of TCP packets.
static constexpr uint8_t MAX_MESSAGES_PER_LOOP = 5;
static constexpr uint8_t MAX_PING_RETRIES = 60;
static constexpr uint16_t PING_RETRY_INTERVAL = 1000;
static constexpr uint32_t KEEPALIVE_DISCONNECT_TIMEOUT = (KEEPALIVE_TIMEOUT_MS * 5) / 2;
static const char *const TAG = "api.connection";
#ifdef USE_ESP32_CAMERA
static const int ESP32_CAMERA_STOP_STREAM = 5000;
#endif
APIConnection::APIConnection(std::unique_ptr<socket::Socket> sock, APIServer *parent)
: parent_(parent), initial_state_iterator_(this), list_entities_iterator_(this) {
@ -90,16 +95,6 @@ APIConnection::~APIConnection() {
}
void APIConnection::loop() {
if (this->remove_)
return;
if (!network::is_connected()) {
// when network is disconnected force disconnect immediately
// don't wait for timeout
this->on_fatal_error();
ESP_LOGW(TAG, "%s: Network unavailable; disconnecting", this->get_client_combined_info().c_str());
return;
}
if (this->next_close_) {
// requested a disconnect
this->helper_->close();
@ -152,20 +147,19 @@ void APIConnection::loop() {
// Process deferred batch if scheduled
if (this->deferred_batch_.batch_scheduled &&
App.get_loop_component_start_time() - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) {
now - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) {
this->process_batch_();
}
if (!this->list_entities_iterator_.completed())
if (!this->list_entities_iterator_.completed()) {
this->list_entities_iterator_.advance();
if (!this->initial_state_iterator_.completed() && this->list_entities_iterator_.completed())
} else if (!this->initial_state_iterator_.completed()) {
this->initial_state_iterator_.advance();
}
static uint8_t max_ping_retries = 60;
static uint16_t ping_retry_interval = 1000;
if (this->sent_ping_) {
// Disconnect if not responded within 2.5*keepalive
if (now - this->last_traffic_ > (KEEPALIVE_TIMEOUT_MS * 5) / 2) {
if (now - this->last_traffic_ > KEEPALIVE_DISCONNECT_TIMEOUT) {
on_fatal_error();
ESP_LOGW(TAG, "%s is unresponsive; disconnecting", this->get_client_combined_info().c_str());
}
@ -173,17 +167,15 @@ void APIConnection::loop() {
ESP_LOGVV(TAG, "Sending keepalive PING");
this->sent_ping_ = this->send_message(PingRequest());
if (!this->sent_ping_) {
this->next_ping_retry_ = now + ping_retry_interval;
this->next_ping_retry_ = now + PING_RETRY_INTERVAL;
this->ping_retries_++;
std::string warn_str = str_sprintf("%s: Sending keepalive failed %u time(s);",
this->get_client_combined_info().c_str(), this->ping_retries_);
if (this->ping_retries_ >= max_ping_retries) {
if (this->ping_retries_ >= MAX_PING_RETRIES) {
on_fatal_error();
ESP_LOGE(TAG, "%s disconnecting", warn_str.c_str());
ESP_LOGE(TAG, "%s: Ping failed %u times", this->get_client_combined_info().c_str(), this->ping_retries_);
} else if (this->ping_retries_ >= 10) {
ESP_LOGW(TAG, "%s retrying in %u ms", warn_str.c_str(), ping_retry_interval);
ESP_LOGW(TAG, "%s: Ping retry %u", this->get_client_combined_info().c_str(), this->ping_retries_);
} else {
ESP_LOGD(TAG, "%s retrying in %u ms", warn_str.c_str(), ping_retry_interval);
ESP_LOGD(TAG, "%s: Ping retry %u", this->get_client_combined_info().c_str(), this->ping_retries_);
}
}
}
@ -207,22 +199,20 @@ void APIConnection::loop() {
// bool done = 3;
buffer.encode_bool(3, done);
bool success = this->send_buffer(buffer, 44);
bool success = this->send_buffer(buffer, CameraImageResponse::MESSAGE_TYPE);
if (success) {
this->image_reader_.consume_data(to_send);
}
if (success && done) {
this->image_reader_.return_image();
if (done) {
this->image_reader_.return_image();
}
}
}
#endif
if (state_subs_at_ != -1) {
if (state_subs_at_ >= 0) {
const auto &subs = this->parent_->get_state_subs();
if (state_subs_at_ >= (int) subs.size()) {
state_subs_at_ = -1;
} else {
if (state_subs_at_ < static_cast<int>(subs.size())) {
auto &it = subs[state_subs_at_];
SubscribeHomeAssistantStateResponse resp;
resp.entity_id = it.entity_id;
@ -231,6 +221,8 @@ void APIConnection::loop() {
if (this->send_message(resp)) {
state_subs_at_++;
}
} else {
state_subs_at_ = -1;
}
}
}

View File

@ -66,6 +66,17 @@ const char *api_error_to_str(APIError err) {
return "UNKNOWN";
}
// Default implementation for loop - handles sending buffered data
APIError APIFrameHelper::loop() {
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
}
// Helper method to buffer data from IOVs
void APIFrameHelper::buffer_data_from_iov_(const struct iovec *iov, int iovcnt, uint16_t total_write_len) {
SendBuffer buffer;
@ -287,13 +298,8 @@ APIError APINoiseFrameHelper::loop() {
}
}
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
// Use base class implementation for buffer sending
return APIFrameHelper::loop();
}
/** Read a packet into the rx_buf_. If successful, stores frame data in the frame parameter
@ -339,17 +345,15 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) {
return APIError::WOULD_BLOCK;
}
if (rx_header_buf_[0] != 0x01) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
return APIError::BAD_INDICATOR;
}
// header reading done
}
// read body
uint8_t indicator = rx_header_buf_[0];
if (indicator != 0x01) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", indicator);
return APIError::BAD_INDICATOR;
}
uint16_t msg_size = (((uint16_t) rx_header_buf_[1]) << 8) | rx_header_buf_[2];
if (state_ != State::DATA && msg_size > 128) {
@ -595,10 +599,6 @@ APIError APINoiseFrameHelper::read_packet(ReadPacketBuffer *buffer) {
return APIError::BAD_DATA_PACKET;
}
// uint16_t type;
// uint16_t data_len;
// uint8_t *data;
// uint8_t *padding; zero or more bytes to fill up the rest of the packet
uint16_t type = (((uint16_t) msg_data[0]) << 8) | msg_data[1];
uint16_t data_len = (((uint16_t) msg_data[2]) << 8) | msg_data[3];
if (data_len > msg_size - 4) {
@ -831,18 +831,12 @@ APIError APIPlaintextFrameHelper::init() {
state_ = State::DATA;
return APIError::OK;
}
/// Not used for plaintext
APIError APIPlaintextFrameHelper::loop() {
if (state_ != State::DATA) {
return APIError::BAD_STATE;
}
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
// Use base class implementation for buffer sending
return APIFrameHelper::loop();
}
/** Read a packet into the rx_buf_. If successful, stores frame data in the frame parameter

View File

@ -38,7 +38,7 @@ struct PacketInfo {
: message_type(type), offset(off), payload_size(size), padding(0) {}
};
enum class APIError : int {
enum class APIError : uint16_t {
OK = 0,
WOULD_BLOCK = 1001,
BAD_HANDSHAKE_PACKET_LEN = 1002,
@ -74,7 +74,7 @@ class APIFrameHelper {
}
virtual ~APIFrameHelper() = default;
virtual APIError init() = 0;
virtual APIError loop() = 0;
virtual APIError loop();
virtual APIError read_packet(ReadPacketBuffer *buffer) = 0;
bool can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); }
std::string getpeername() { return socket_->getpeername(); }

View File

@ -47,6 +47,11 @@ void APIServer::setup() {
}
#endif
// Schedule reboot if no clients connect within timeout
if (this->reboot_timeout_ != 0) {
this->schedule_reboot_timeout_();
}
this->socket_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections
if (this->socket_ == nullptr) {
ESP_LOGW(TAG, "Could not create socket");
@ -106,8 +111,6 @@ void APIServer::setup() {
}
#endif
this->last_connected_ = App.get_loop_component_start_time();
#ifdef USE_ESP32_CAMERA
if (esp32_camera::global_esp32_camera != nullptr && !esp32_camera::global_esp32_camera->is_internal()) {
esp32_camera::global_esp32_camera->add_image_callback(
@ -121,6 +124,16 @@ void APIServer::setup() {
#endif
}
void APIServer::schedule_reboot_timeout_() {
this->status_set_warning();
this->set_timeout("api_reboot", this->reboot_timeout_, []() {
if (!global_api_server->is_connected()) {
ESP_LOGE(TAG, "No clients; rebooting");
App.reboot();
}
});
}
void APIServer::loop() {
// Accept new clients only if the socket exists and has incoming connections
if (this->socket_ && this->socket_->ready()) {
@ -130,51 +143,61 @@ void APIServer::loop() {
auto sock = this->socket_->accept_loop_monitored((struct sockaddr *) &source_addr, &addr_len);
if (!sock)
break;
ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str());
ESP_LOGD(TAG, "Accept %s", sock->getpeername().c_str());
auto *conn = new APIConnection(std::move(sock), this);
this->clients_.emplace_back(conn);
conn->start();
// Clear warning status and cancel reboot when first client connects
if (this->clients_.size() == 1 && this->reboot_timeout_ != 0) {
this->status_clear_warning();
this->cancel_timeout("api_reboot");
}
}
}
if (this->clients_.empty()) {
return;
}
// Process clients and remove disconnected ones in a single pass
if (!this->clients_.empty()) {
size_t client_index = 0;
while (client_index < this->clients_.size()) {
auto &client = this->clients_[client_index];
if (client->remove_) {
// Handle disconnection
this->client_disconnected_trigger_->trigger(client->client_info_, client->client_peername_);
ESP_LOGV(TAG, "Removing connection to %s", client->client_info_.c_str());
// Swap with the last element and pop (avoids expensive vector shifts)
if (client_index < this->clients_.size() - 1) {
std::swap(this->clients_[client_index], this->clients_.back());
}
this->clients_.pop_back();
// Don't increment client_index since we need to process the swapped element
} else {
// Process active client
client->loop();
client_index++; // Move to next client
}
// Check network connectivity once for all clients
if (!network::is_connected()) {
// Network is down - disconnect all clients
for (auto &client : this->clients_) {
client->on_fatal_error();
ESP_LOGW(TAG, "%s: Network down; disconnect", client->get_client_combined_info().c_str());
}
// Continue to process and clean up the clients below
}
if (this->reboot_timeout_ != 0) {
const uint32_t now = App.get_loop_component_start_time();
if (!this->is_connected()) {
if (now - this->last_connected_ > this->reboot_timeout_) {
ESP_LOGE(TAG, "No client connected; rebooting");
App.reboot();
}
this->status_set_warning();
} else {
this->last_connected_ = now;
this->status_clear_warning();
size_t client_index = 0;
while (client_index < this->clients_.size()) {
auto &client = this->clients_[client_index];
if (!client->remove_) {
// Common case: process active client
client->loop();
client_index++;
continue;
}
// Rare case: handle disconnection
this->client_disconnected_trigger_->trigger(client->client_info_, client->client_peername_);
ESP_LOGV(TAG, "Remove connection %s", client->client_info_.c_str());
// Swap with the last element and pop (avoids expensive vector shifts)
if (client_index < this->clients_.size() - 1) {
std::swap(this->clients_[client_index], this->clients_.back());
}
this->clients_.pop_back();
// Schedule reboot when last client disconnects
if (this->clients_.empty() && this->reboot_timeout_ != 0) {
this->schedule_reboot_timeout_();
}
// Don't increment client_index since we need to process the swapped element
}
}

View File

@ -142,6 +142,7 @@ class APIServer : public Component, public Controller {
}
protected:
void schedule_reboot_timeout_();
// Pointers and pointer-like types first (4 bytes each)
std::unique_ptr<socket::Socket> socket_ = nullptr;
Trigger<std::string, std::string> *client_connected_trigger_ = new Trigger<std::string, std::string>();
@ -150,7 +151,6 @@ class APIServer : public Component, public Controller {
// 4-byte aligned types
uint32_t reboot_timeout_{300000};
uint32_t batch_delay_{100};
uint32_t last_connected_{0};
// Vectors and strings (12 bytes each on 32-bit)
std::vector<std::unique_ptr<APIConnection>> clients_;

View File

@ -0,0 +1,7 @@
esphome:
name: api-reboot-test
host:
api:
reboot_timeout: 0.5s # Very short timeout for fast testing
logger:
level: DEBUG

View File

@ -0,0 +1,35 @@
"""Test API server reboot timeout functionality."""
import asyncio
import re
import pytest
from .types import RunCompiledFunction
@pytest.mark.asyncio
async def test_api_reboot_timeout(
yaml_config: str,
run_compiled: RunCompiledFunction,
) -> None:
"""Test that the device reboots when no API clients connect within the timeout."""
loop = asyncio.get_running_loop()
reboot_future = loop.create_future()
reboot_pattern = re.compile(r"No clients; rebooting")
def check_output(line: str) -> None:
"""Check output for reboot message."""
if not reboot_future.done() and reboot_pattern.search(line):
reboot_future.set_result(True)
# Run the device without connecting any API client
async with run_compiled(yaml_config, line_callback=check_output):
# Wait for reboot with timeout
# (0.5s reboot timeout + some margin for processing)
try:
await asyncio.wait_for(reboot_future, timeout=2.0)
except asyncio.TimeoutError:
pytest.fail("Device did not reboot within expected timeout")
# Test passes if we get here - reboot was detected