Optimize socket operations by checking readiness in the main loop (#8918)

This commit is contained in:
J. Nick Koston 2025-05-28 18:16:37 -05:00 committed by GitHub
parent ffc66f539f
commit 43e88af28a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 386 additions and 55 deletions

View File

@ -135,6 +135,9 @@ void APIConnection::loop() {
api_error_to_str(err), errno);
return;
}
// Check if socket has data ready before attempting to read
if (this->helper_->is_socket_ready()) {
ReadPacketBuffer buffer;
err = this->helper_->read_packet(&buffer);
if (err == APIError::WOULD_BLOCK) {
@ -161,6 +164,7 @@ void APIConnection::loop() {
if (this->remove_)
return;
}
}
if (!this->deferred_message_queue_.empty() && this->helper_->can_write_without_blocking()) {
this->deferred_message_queue_.process_queue();

View File

@ -13,6 +13,7 @@
#include "api_noise_context.h"
#include "esphome/components/socket/socket.h"
#include "esphome/core/application.h"
namespace esphome {
namespace api {
@ -90,6 +91,8 @@ class APIFrameHelper {
virtual uint8_t frame_header_padding() = 0;
// Get the frame footer size required by this protocol
virtual uint8_t frame_footer_size() = 0;
// Check if socket has data ready to read
bool is_socket_ready() const { return socket_ != nullptr && socket_->ready(); }
protected:
// Struct for holding parsed frame data

View File

@ -43,7 +43,7 @@ void APIServer::setup() {
}
#endif
this->socket_ = socket::socket_ip(SOCK_STREAM, 0);
this->socket_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections
if (this->socket_ == nullptr) {
ESP_LOGW(TAG, "Could not create socket");
this->mark_failed();
@ -112,11 +112,12 @@ void APIServer::setup() {
}
void APIServer::loop() {
// Accept new clients
// Accept new clients only if the socket has incoming connections
if (this->socket_->ready()) {
while (true) {
struct sockaddr_storage source_addr;
socklen_t addr_len = sizeof(source_addr);
auto sock = this->socket_->accept((struct sockaddr *) &source_addr, &addr_len);
auto sock = this->socket_->accept_loop_monitored((struct sockaddr *) &source_addr, &addr_len);
if (!sock)
break;
ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str());
@ -125,6 +126,7 @@ void APIServer::loop() {
this->clients_.emplace_back(conn);
conn->start();
}
}
// Process clients and remove disconnected ones in a single pass
if (!this->clients_.empty()) {

View File

@ -26,7 +26,7 @@ void ESPHomeOTAComponent::setup() {
ota::register_ota_platform(this);
#endif
server_ = socket::socket_ip(SOCK_STREAM, 0);
server_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections
if (server_ == nullptr) {
ESP_LOGW(TAG, "Could not create socket");
this->mark_failed();
@ -100,10 +100,13 @@ void ESPHomeOTAComponent::handle_() {
#endif
if (client_ == nullptr) {
// Check if the server socket is ready before accepting
if (this->server_->ready()) {
struct sockaddr_storage source_addr;
socklen_t addr_len = sizeof(source_addr);
client_ = server_->accept((struct sockaddr *) &source_addr, &addr_len);
}
}
if (client_ == nullptr)
return;

View File

@ -35,5 +35,7 @@ async def to_code(config):
cg.add_define("USE_SOCKET_IMPL_LWIP_TCP")
elif impl == IMPLEMENTATION_LWIP_SOCKETS:
cg.add_define("USE_SOCKET_IMPL_LWIP_SOCKETS")
cg.add_define("USE_SOCKET_SELECT_SUPPORT")
elif impl == IMPLEMENTATION_BSD_SOCKETS:
cg.add_define("USE_SOCKET_IMPL_BSD_SOCKETS")
cg.add_define("USE_SOCKET_SELECT_SUPPORT")

View File

@ -5,6 +5,7 @@
#ifdef USE_SOCKET_IMPL_BSD_SOCKETS
#include <cstring>
#include "esphome/core/application.h"
#ifdef USE_ESP32
#include <esp_idf_version.h>
@ -40,7 +41,20 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) {
class BSDSocketImpl : public Socket {
public:
BSDSocketImpl(int fd) : fd_(fd) {}
BSDSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) {
#ifdef USE_SOCKET_SELECT_SUPPORT
// Register new socket with the application for select() if monitoring requested
if (monitor_loop && fd_ >= 0) {
// Only set loop_monitored_ to true if registration succeeds
loop_monitored_ = App.register_socket_fd(fd_);
} else {
loop_monitored_ = false;
}
#else
// Without select support, ignore monitor_loop parameter
(void) monitor_loop;
#endif
}
~BSDSocketImpl() override {
if (!closed_) {
close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
@ -48,17 +62,36 @@ class BSDSocketImpl : public Socket {
}
int connect(const struct sockaddr *addr, socklen_t addrlen) override { return ::connect(fd_, addr, addrlen); }
std::unique_ptr<Socket> accept(struct sockaddr *addr, socklen_t *addrlen) override {
return accept_impl_(addr, addrlen, false);
}
std::unique_ptr<Socket> accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) override {
return accept_impl_(addr, addrlen, true);
}
private:
std::unique_ptr<Socket> accept_impl_(struct sockaddr *addr, socklen_t *addrlen, bool loop_monitored) {
int fd = ::accept(fd_, addr, addrlen);
if (fd == -1)
return {};
return make_unique<BSDSocketImpl>(fd);
return make_unique<BSDSocketImpl>(fd, loop_monitored);
}
public:
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return ::bind(fd_, addr, addrlen); }
int close() override {
if (!closed_) {
#ifdef USE_SOCKET_SELECT_SUPPORT
// Unregister from select() before closing if monitored
if (loop_monitored_) {
App.unregister_socket_fd(fd_);
}
#endif
int ret = ::close(fd_);
closed_ = true;
return ret;
}
return 0;
}
int shutdown(int how) override { return ::shutdown(fd_, how); }
int getpeername(struct sockaddr *addr, socklen_t *addrlen) override { return ::getpeername(fd_, addr, addrlen); }
@ -126,16 +159,27 @@ class BSDSocketImpl : public Socket {
return 0;
}
int get_fd() const override { return fd_; }
protected:
int fd_;
bool closed_ = false;
};
std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
// Helper to create a socket with optional monitoring
static std::unique_ptr<Socket> create_socket(int domain, int type, int protocol, bool loop_monitored = false) {
int ret = ::socket(domain, type, protocol);
if (ret == -1)
return nullptr;
return std::unique_ptr<Socket>{new BSDSocketImpl(ret)};
return std::unique_ptr<Socket>{new BSDSocketImpl(ret, loop_monitored)};
}
std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
return create_socket(domain, type, protocol, false);
}
std::unique_ptr<Socket> socket_loop_monitored(int domain, int type, int protocol) {
return create_socket(domain, type, protocol, true);
}
} // namespace socket

View File

@ -606,6 +606,11 @@ std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
return std::unique_ptr<Socket>{sock};
}
std::unique_ptr<Socket> socket_loop_monitored(int domain, int type, int protocol) {
// LWIPRawImpl doesn't use file descriptors, so monitoring is not applicable
return socket(domain, type, protocol);
}
} // namespace socket
} // namespace esphome

View File

@ -5,6 +5,7 @@
#ifdef USE_SOCKET_IMPL_LWIP_SOCKETS
#include <cstring>
#include "esphome/core/application.h"
namespace esphome {
namespace socket {
@ -33,7 +34,20 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) {
class LwIPSocketImpl : public Socket {
public:
LwIPSocketImpl(int fd) : fd_(fd) {}
LwIPSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) {
#ifdef USE_SOCKET_SELECT_SUPPORT
// Register new socket with the application for select() if monitoring requested
if (monitor_loop && fd_ >= 0) {
// Only set loop_monitored_ to true if registration succeeds
loop_monitored_ = App.register_socket_fd(fd_);
} else {
loop_monitored_ = false;
}
#else
// Without select support, ignore monitor_loop parameter
(void) monitor_loop;
#endif
}
~LwIPSocketImpl() override {
if (!closed_) {
close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
@ -41,17 +55,36 @@ class LwIPSocketImpl : public Socket {
}
int connect(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_connect(fd_, addr, addrlen); }
std::unique_ptr<Socket> accept(struct sockaddr *addr, socklen_t *addrlen) override {
return accept_impl_(addr, addrlen, false);
}
std::unique_ptr<Socket> accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) override {
return accept_impl_(addr, addrlen, true);
}
private:
std::unique_ptr<Socket> accept_impl_(struct sockaddr *addr, socklen_t *addrlen, bool loop_monitored) {
int fd = lwip_accept(fd_, addr, addrlen);
if (fd == -1)
return {};
return make_unique<LwIPSocketImpl>(fd);
return make_unique<LwIPSocketImpl>(fd, loop_monitored);
}
public:
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_bind(fd_, addr, addrlen); }
int close() override {
if (!closed_) {
#ifdef USE_SOCKET_SELECT_SUPPORT
// Unregister from select() before closing if monitored
if (loop_monitored_) {
App.unregister_socket_fd(fd_);
}
#endif
int ret = lwip_close(fd_);
closed_ = true;
return ret;
}
return 0;
}
int shutdown(int how) override { return lwip_shutdown(fd_, how); }
int getpeername(struct sockaddr *addr, socklen_t *addrlen) override { return lwip_getpeername(fd_, addr, addrlen); }
@ -98,16 +131,27 @@ class LwIPSocketImpl : public Socket {
return 0;
}
int get_fd() const override { return fd_; }
protected:
int fd_;
bool closed_ = false;
};
std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
// Helper to create a socket with optional monitoring
static std::unique_ptr<Socket> create_socket(int domain, int type, int protocol, bool loop_monitored = false) {
int ret = lwip_socket(domain, type, protocol);
if (ret == -1)
return nullptr;
return std::unique_ptr<Socket>{new LwIPSocketImpl(ret)};
return std::unique_ptr<Socket>{new LwIPSocketImpl(ret, loop_monitored)};
}
std::unique_ptr<Socket> socket(int domain, int type, int protocol) {
return create_socket(domain, type, protocol, false);
}
std::unique_ptr<Socket> socket_loop_monitored(int domain, int type, int protocol) {
return create_socket(domain, type, protocol, true);
}
} // namespace socket

View File

@ -4,12 +4,35 @@
#include <cstring>
#include <string>
#include "esphome/core/log.h"
#include "esphome/core/application.h"
namespace esphome {
namespace socket {
Socket::~Socket() {}
bool Socket::ready() const {
#ifdef USE_SOCKET_SELECT_SUPPORT
if (!loop_monitored_) {
// Non-monitored sockets always return true (assume data may be available)
return true;
}
// For loop-monitored sockets, check with the Application's select() results
int fd = this->get_fd();
if (fd < 0) {
// No valid file descriptor, assume ready (fallback behavior)
return true;
}
return App.is_socket_ready(fd);
#else
// Without select() support, we can't monitor sockets in the loop
// Always return true (assume data may be available)
return true;
#endif
}
std::unique_ptr<Socket> socket_ip(int type, int protocol) {
#if USE_NETWORK_IPV6
return socket(AF_INET6, type, protocol);
@ -18,6 +41,14 @@ std::unique_ptr<Socket> socket_ip(int type, int protocol) {
#endif /* USE_NETWORK_IPV6 */
}
std::unique_ptr<Socket> socket_ip_loop_monitored(int type, int protocol) {
#if USE_NETWORK_IPV6
return socket_loop_monitored(AF_INET6, type, protocol);
#else
return socket_loop_monitored(AF_INET, type, protocol);
#endif /* USE_NETWORK_IPV6 */
}
socklen_t set_sockaddr(struct sockaddr *addr, socklen_t addrlen, const std::string &ip_address, uint16_t port) {
#if USE_NETWORK_IPV6
if (ip_address.find(':') != std::string::npos) {

View File

@ -17,6 +17,11 @@ class Socket {
Socket &operator=(const Socket &) = delete;
virtual std::unique_ptr<Socket> accept(struct sockaddr *addr, socklen_t *addrlen) = 0;
/// Accept a connection and monitor it in the main loop
/// NOTE: This function is NOT thread-safe and must only be called from the main loop
virtual std::unique_ptr<Socket> accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) {
return accept(addr, addrlen); // Default implementation for backward compatibility
}
virtual int bind(const struct sockaddr *addr, socklen_t addrlen) = 0;
virtual int close() = 0;
// not supported yet:
@ -44,14 +49,35 @@ class Socket {
virtual int setblocking(bool blocking) = 0;
virtual int loop() { return 0; };
/// Get the underlying file descriptor (returns -1 if not supported)
virtual int get_fd() const { return -1; }
/// Check if socket has data ready to read
/// For loop-monitored sockets, checks with the Application's select() results
/// For non-monitored sockets, always returns true (assumes data may be available)
bool ready() const;
protected:
#ifdef USE_SOCKET_SELECT_SUPPORT
bool loop_monitored_{false}; ///< Whether this socket is monitored by the event loop
#endif
};
/// Create a socket of the given domain, type and protocol.
std::unique_ptr<Socket> socket(int domain, int type, int protocol);
/// Create a socket in the newest available IP domain (IPv6 or IPv4) of the given type and protocol.
std::unique_ptr<Socket> socket_ip(int type, int protocol);
/// Create a socket and monitor it for data in the main loop.
/// Like socket() but also registers the socket with the Application's select() loop.
/// WARNING: These functions are NOT thread-safe. They must only be called from the main loop
/// as they register the socket file descriptor with the global Application instance.
/// NOTE: On ESP platforms, FD_SETSIZE is typically 10, limiting the number of monitored sockets.
/// File descriptors >= FD_SETSIZE will not be monitored and will log an error.
std::unique_ptr<Socket> socket_loop_monitored(int domain, int type, int protocol);
std::unique_ptr<Socket> socket_ip_loop_monitored(int type, int protocol);
/// Set a sockaddr to the specified address and port for the IP version used by socket_ip().
socklen_t set_sockaddr(struct sockaddr *addr, socklen_t addrlen, const std::string &ip_address, uint16_t port);

View File

@ -2,11 +2,30 @@
#include "esphome/core/log.h"
#include "esphome/core/version.h"
#include "esphome/core/hal.h"
#include <algorithm>
#ifdef USE_STATUS_LED
#include "esphome/components/status_led/status_led.h"
#endif
#ifdef USE_SOCKET_SELECT_SUPPORT
#include <cerrno>
#ifdef USE_SOCKET_IMPL_LWIP_SOCKETS
// LWIP sockets implementation
#include <lwip/sockets.h>
#elif defined(USE_SOCKET_IMPL_BSD_SOCKETS)
// BSD sockets implementation
#ifdef USE_ESP32
// ESP32 "BSD sockets" are actually LWIP under the hood
#include <lwip/sockets.h>
#else
// True BSD sockets (e.g., host platform)
#include <sys/select.h>
#endif
#endif
#endif
namespace esphome {
static const char *const TAG = "app";
@ -106,8 +125,66 @@ void Application::loop() {
// otherwise interval=0 schedules result in constant looping with almost no sleep
next_schedule = std::max(next_schedule, delay_time / 2);
delay_time = std::min(next_schedule, delay_time);
#ifdef USE_SOCKET_SELECT_SUPPORT
if (!this->socket_fds_.empty()) {
// Use select() with timeout when we have sockets to monitor
// Update fd_set if socket list has changed
if (this->socket_fds_changed_) {
FD_ZERO(&this->base_read_fds_);
for (int fd : this->socket_fds_) {
if (fd >= 0 && fd < FD_SETSIZE) {
FD_SET(fd, &this->base_read_fds_);
}
}
this->socket_fds_changed_ = false;
}
// Copy base fd_set before each select
this->read_fds_ = this->base_read_fds_;
// Convert delay_time (milliseconds) to timeval
struct timeval tv;
tv.tv_sec = delay_time / 1000;
tv.tv_usec = (delay_time - tv.tv_sec * 1000) * 1000;
// Call select with timeout
#if defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || (defined(USE_ESP32) && defined(USE_SOCKET_IMPL_BSD_SOCKETS))
// Use lwip_select() on platforms with lwIP - it's faster
// Note: On ESP32 with BSD sockets, select() is already mapped to lwip_select() via macros,
// but we explicitly call lwip_select() for clarity and to ensure we get the optimized version
int ret = lwip_select(this->max_fd_ + 1, &this->read_fds_, nullptr, nullptr, &tv);
#else
// Use standard select() on other platforms (e.g., host/native builds)
int ret = ::select(this->max_fd_ + 1, &this->read_fds_, nullptr, nullptr, &tv);
#endif
if (ret < 0) {
if (errno == EINTR) {
// Interrupted by signal - this is normal, just continue
// No need to delay as some time has already passed
ESP_LOGVV(TAG, "select() interrupted by signal");
} else {
// Actual error - log and fall back to delay
ESP_LOGW(TAG, "select() failed with errno %d", errno);
delay(delay_time);
}
} else if (ret > 0) {
ESP_LOGVV(TAG, "select() woke early: %d socket(s) ready (saved up to %ums)", ret, delay_time);
} else {
// ret == 0: timeout occurred (normal)
ESP_LOGVV(TAG, "select() timeout after %ums (no sockets ready)", delay_time);
}
} else {
// No sockets registered, use regular delay
delay(delay_time);
}
#else
// No select support, use regular delay
delay(delay_time);
#endif
}
this->last_loop_ = last_op_end_time;
if (this->dump_config_at_ < this->components_.size()) {
@ -167,6 +244,67 @@ void Application::calculate_looping_components_() {
}
}
#ifdef USE_SOCKET_SELECT_SUPPORT
bool Application::register_socket_fd(int fd) {
// WARNING: This function is NOT thread-safe and must only be called from the main loop
// It modifies socket_fds_ and related variables without locking
if (fd < 0)
return false;
if (fd >= FD_SETSIZE) {
ESP_LOGE(TAG, "Cannot monitor socket fd %d: exceeds FD_SETSIZE (%d)", fd, FD_SETSIZE);
ESP_LOGE(TAG, "Socket will not be monitored for data - may cause performance issues!");
return false;
}
this->socket_fds_.push_back(fd);
this->socket_fds_changed_ = true;
if (fd > this->max_fd_) {
this->max_fd_ = fd;
}
return true;
}
void Application::unregister_socket_fd(int fd) {
// WARNING: This function is NOT thread-safe and must only be called from the main loop
// It modifies socket_fds_ and related variables without locking
if (fd < 0)
return;
auto it = std::find(this->socket_fds_.begin(), this->socket_fds_.end(), fd);
if (it != this->socket_fds_.end()) {
// Swap with last element and pop - O(1) removal since order doesn't matter
if (it != this->socket_fds_.end() - 1) {
std::swap(*it, this->socket_fds_.back());
}
this->socket_fds_.pop_back();
this->socket_fds_changed_ = true;
// Only recalculate max_fd if we removed the current max
if (fd == this->max_fd_) {
if (this->socket_fds_.empty()) {
this->max_fd_ = -1;
} else {
// Find new max using std::max_element
this->max_fd_ = *std::max_element(this->socket_fds_.begin(), this->socket_fds_.end());
}
}
}
}
bool Application::is_socket_ready(int fd) const {
// This function is thread-safe for reading the result of select()
// However, it should only be called after select() has been executed in the main loop
// The read_fds_ is only modified by select() in the main loop
if (fd < 0 || fd >= FD_SETSIZE)
return false;
return FD_ISSET(fd, &this->read_fds_);
}
#endif
Application App; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
} // namespace esphome

View File

@ -9,6 +9,10 @@
#include "esphome/core/preferences.h"
#include "esphome/core/scheduler.h"
#ifdef USE_SOCKET_SELECT_SUPPORT
#include <sys/select.h>
#endif
#ifdef USE_BINARY_SENSOR
#include "esphome/components/binary_sensor/binary_sensor.h"
#endif
@ -467,6 +471,19 @@ class Application {
Scheduler scheduler;
/// Register/unregister a socket file descriptor to be monitored for read events.
#ifdef USE_SOCKET_SELECT_SUPPORT
/// These functions update the fd_set used by select() in the main loop.
/// WARNING: These functions are NOT thread-safe. They must only be called from the main loop.
/// NOTE: File descriptors >= FD_SETSIZE (typically 10 on ESP) will be rejected with an error.
/// @return true if registration was successful, false if fd exceeds limits
bool register_socket_fd(int fd);
void unregister_socket_fd(int fd);
/// Check if there's data available on a socket without blocking
/// This function is thread-safe for reading, but should be called after select() has run
bool is_socket_ready(int fd) const;
#endif
protected:
friend Component;
@ -555,6 +572,15 @@ class Application {
uint32_t app_state_{0};
Component *current_component_{nullptr};
uint32_t loop_component_start_time_{0};
#ifdef USE_SOCKET_SELECT_SUPPORT
// Socket select management
std::vector<int> socket_fds_; // Vector of all monitored socket file descriptors
bool socket_fds_changed_{false}; // Flag to rebuild base_read_fds_ when socket_fds_ changes
int max_fd_{-1}; // Highest file descriptor number for select()
fd_set base_read_fds_{}; // Cached fd_set rebuilt only when socket_fds_ changes
fd_set read_fds_{}; // Working fd_set for select(), copied from base_read_fds_
#endif
};
/// Global storage of Application pointer - only one Application can exist.

View File

@ -140,6 +140,7 @@
#define USE_MICROPHONE
#define USE_PSRAM
#define USE_SOCKET_IMPL_BSD_SOCKETS
#define USE_SOCKET_SELECT_SUPPORT
#define USE_SPEAKER
#define USE_SPI
#define USE_VOICE_ASSISTANT
@ -199,12 +200,14 @@
#ifdef USE_LIBRETINY
#define USE_CAPTIVE_PORTAL
#define USE_SOCKET_IMPL_LWIP_SOCKETS
#define USE_SOCKET_SELECT_SUPPORT
#define USE_WEBSERVER
#define USE_WEBSERVER_PORT 80 // NOLINT
#endif
#ifdef USE_HOST
#define USE_SOCKET_IMPL_BSD_SOCKETS
#define USE_SOCKET_SELECT_SUPPORT
#endif
// Disabled feature flags