Fix slow noise handshake by reading multiple messages per loop (#9130)

This commit is contained in:
J. Nick Koston 2025-06-23 04:07:53 +02:00 committed by GitHub
parent 59889a6286
commit 04f592ba6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 29 deletions

View File

@ -28,6 +28,12 @@
namespace esphome { namespace esphome {
namespace api { namespace api {
// Read a maximum of 5 messages per loop iteration to prevent starving other components.
// This is a balance between API responsiveness and allowing other components to run.
// Since each message could contain multiple protobuf messages when using packet batching,
// this limits the number of messages processed, not the number of TCP packets.
static constexpr uint8_t MAX_MESSAGES_PER_LOOP = 5;
static const char *const TAG = "api.connection"; static const char *const TAG = "api.connection";
static const int ESP32_CAMERA_STOP_STREAM = 5000; static const int ESP32_CAMERA_STOP_STREAM = 5000;
@ -109,33 +115,38 @@ void APIConnection::loop() {
return; return;
} }
const uint32_t now = App.get_loop_component_start_time();
// Check if socket has data ready before attempting to read // Check if socket has data ready before attempting to read
if (this->helper_->is_socket_ready()) { if (this->helper_->is_socket_ready()) {
ReadPacketBuffer buffer; // Read up to MAX_MESSAGES_PER_LOOP messages per loop to improve throughput
err = this->helper_->read_packet(&buffer); for (uint8_t message_count = 0; message_count < MAX_MESSAGES_PER_LOOP; message_count++) {
if (err == APIError::WOULD_BLOCK) { ReadPacketBuffer buffer;
// pass err = this->helper_->read_packet(&buffer);
} else if (err != APIError::OK) { if (err == APIError::WOULD_BLOCK) {
on_fatal_error(); // No more data available
if (err == APIError::SOCKET_READ_FAILED && errno == ECONNRESET) { break;
ESP_LOGW(TAG, "%s: Connection reset", this->get_client_combined_info().c_str()); } else if (err != APIError::OK) {
} else if (err == APIError::CONNECTION_CLOSED) { on_fatal_error();
ESP_LOGW(TAG, "%s: Connection closed", this->get_client_combined_info().c_str()); if (err == APIError::SOCKET_READ_FAILED && errno == ECONNRESET) {
} else { ESP_LOGW(TAG, "%s: Connection reset", this->get_client_combined_info().c_str());
ESP_LOGW(TAG, "%s: Reading failed: %s errno=%d", this->get_client_combined_info().c_str(), } else if (err == APIError::CONNECTION_CLOSED) {
api_error_to_str(err), errno); ESP_LOGW(TAG, "%s: Connection closed", this->get_client_combined_info().c_str());
} } else {
return; ESP_LOGW(TAG, "%s: Reading failed: %s errno=%d", this->get_client_combined_info().c_str(),
} else { api_error_to_str(err), errno);
this->last_traffic_ = App.get_loop_component_start_time(); }
// read a packet
if (buffer.data_len > 0) {
this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]);
} else {
this->read_message(0, buffer.type, nullptr);
}
if (this->remove_)
return; return;
} else {
this->last_traffic_ = now;
// read a packet
if (buffer.data_len > 0) {
this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]);
} else {
this->read_message(0, buffer.type, nullptr);
}
if (this->remove_)
return;
}
} }
} }
@ -152,7 +163,6 @@ void APIConnection::loop() {
static uint8_t max_ping_retries = 60; static uint8_t max_ping_retries = 60;
static uint16_t ping_retry_interval = 1000; static uint16_t ping_retry_interval = 1000;
const uint32_t now = App.get_loop_component_start_time();
if (this->sent_ping_) { if (this->sent_ping_) {
// Disconnect if not responded within 2.5*keepalive // Disconnect if not responded within 2.5*keepalive
if (now - this->last_traffic_ > (KEEPALIVE_TIMEOUT_MS * 5) / 2) { if (now - this->last_traffic_ > (KEEPALIVE_TIMEOUT_MS * 5) / 2) {

View File

@ -274,12 +274,21 @@ APIError APINoiseFrameHelper::init() {
} }
/// Run through handshake messages (if in that phase) /// Run through handshake messages (if in that phase)
APIError APINoiseFrameHelper::loop() { APIError APINoiseFrameHelper::loop() {
APIError err = state_action_(); // During handshake phase, process as many actions as possible until we can't progress
if (err != APIError::OK && err != APIError::WOULD_BLOCK) { // socket_->ready() stays true until next main loop, but state_action() will return
return err; // WOULD_BLOCK when no more data is available to read
while (state_ != State::DATA && this->socket_->ready()) {
APIError err = state_action_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
if (err == APIError::WOULD_BLOCK) {
break;
}
} }
if (!this->tx_buf_.empty()) { if (!this->tx_buf_.empty()) {
err = try_send_tx_buf_(); APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) { if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err; return err;
} }