From 43e88af28aea4891df54f47bb433bbe1a41636ae Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 28 May 2025 18:16:37 -0500 Subject: [PATCH] Optimize socket operations by checking readiness in the main loop (#8918) --- esphome/components/api/api_connection.cpp | 52 ++++--- esphome/components/api/api_frame_helper.h | 3 + esphome/components/api/api_server.cpp | 26 ++-- .../components/esphome/ota/ota_esphome.cpp | 11 +- esphome/components/socket/__init__.py | 2 + .../components/socket/bsd_sockets_impl.cpp | 58 +++++++- .../components/socket/lwip_raw_tcp_impl.cpp | 5 + .../components/socket/lwip_sockets_impl.cpp | 58 +++++++- esphome/components/socket/socket.cpp | 31 ++++ esphome/components/socket/socket.h | 28 +++- esphome/core/application.cpp | 138 ++++++++++++++++++ esphome/core/application.h | 26 ++++ esphome/core/defines.h | 3 + 13 files changed, 386 insertions(+), 55 deletions(-) diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index b4646a2d7d..ca615a6d98 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -135,31 +135,35 @@ void APIConnection::loop() { api_error_to_str(err), errno); return; } - ReadPacketBuffer buffer; - err = this->helper_->read_packet(&buffer); - if (err == APIError::WOULD_BLOCK) { - // pass - } else if (err != APIError::OK) { - on_fatal_error(); - if (err == APIError::SOCKET_READ_FAILED && errno == ECONNRESET) { - ESP_LOGW(TAG, "%s: Connection reset", this->client_combined_info_.c_str()); - } else if (err == APIError::CONNECTION_CLOSED) { - ESP_LOGW(TAG, "%s: Connection closed", this->client_combined_info_.c_str()); - } else { - ESP_LOGW(TAG, "%s: Reading failed: %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err), - errno); - } - return; - } else { - this->last_traffic_ = App.get_loop_component_start_time(); - // read a packet - if (buffer.data_len > 0) { - this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]); - } else { - this->read_message(0, buffer.type, nullptr); - } - if (this->remove_) + + // Check if socket has data ready before attempting to read + if (this->helper_->is_socket_ready()) { + ReadPacketBuffer buffer; + err = this->helper_->read_packet(&buffer); + if (err == APIError::WOULD_BLOCK) { + // pass + } else if (err != APIError::OK) { + on_fatal_error(); + if (err == APIError::SOCKET_READ_FAILED && errno == ECONNRESET) { + ESP_LOGW(TAG, "%s: Connection reset", this->client_combined_info_.c_str()); + } else if (err == APIError::CONNECTION_CLOSED) { + ESP_LOGW(TAG, "%s: Connection closed", this->client_combined_info_.c_str()); + } else { + ESP_LOGW(TAG, "%s: Reading failed: %s errno=%d", this->client_combined_info_.c_str(), api_error_to_str(err), + errno); + } return; + } else { + this->last_traffic_ = App.get_loop_component_start_time(); + // read a packet + if (buffer.data_len > 0) { + this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]); + } else { + this->read_message(0, buffer.type, nullptr); + } + if (this->remove_) + return; + } } if (!this->deferred_message_queue_.empty() && this->helper_->can_write_without_blocking()) { diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index bc25680a53..90bd0164e3 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -13,6 +13,7 @@ #include "api_noise_context.h" #include "esphome/components/socket/socket.h" +#include "esphome/core/application.h" namespace esphome { namespace api { @@ -90,6 +91,8 @@ class APIFrameHelper { virtual uint8_t frame_header_padding() = 0; // Get the frame footer size required by this protocol virtual uint8_t frame_footer_size() = 0; + // Check if socket has data ready to read + bool is_socket_ready() const { return socket_ != nullptr && socket_->ready(); } protected: // Struct for holding parsed frame data diff --git a/esphome/components/api/api_server.cpp b/esphome/components/api/api_server.cpp index 958245a6c0..670ba2331b 100644 --- a/esphome/components/api/api_server.cpp +++ b/esphome/components/api/api_server.cpp @@ -43,7 +43,7 @@ void APIServer::setup() { } #endif - this->socket_ = socket::socket_ip(SOCK_STREAM, 0); + this->socket_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections if (this->socket_ == nullptr) { ESP_LOGW(TAG, "Could not create socket"); this->mark_failed(); @@ -112,18 +112,20 @@ void APIServer::setup() { } void APIServer::loop() { - // Accept new clients - while (true) { - struct sockaddr_storage source_addr; - socklen_t addr_len = sizeof(source_addr); - auto sock = this->socket_->accept((struct sockaddr *) &source_addr, &addr_len); - if (!sock) - break; - ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str()); + // Accept new clients only if the socket has incoming connections + if (this->socket_->ready()) { + while (true) { + struct sockaddr_storage source_addr; + socklen_t addr_len = sizeof(source_addr); + auto sock = this->socket_->accept_loop_monitored((struct sockaddr *) &source_addr, &addr_len); + if (!sock) + break; + ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str()); - auto *conn = new APIConnection(std::move(sock), this); - this->clients_.emplace_back(conn); - conn->start(); + auto *conn = new APIConnection(std::move(sock), this); + this->clients_.emplace_back(conn); + conn->start(); + } } // Process clients and remove disconnected ones in a single pass diff --git a/esphome/components/esphome/ota/ota_esphome.cpp b/esphome/components/esphome/ota/ota_esphome.cpp index 6067da15cb..6f128e548b 100644 --- a/esphome/components/esphome/ota/ota_esphome.cpp +++ b/esphome/components/esphome/ota/ota_esphome.cpp @@ -26,7 +26,7 @@ void ESPHomeOTAComponent::setup() { ota::register_ota_platform(this); #endif - server_ = socket::socket_ip(SOCK_STREAM, 0); + server_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections if (server_ == nullptr) { ESP_LOGW(TAG, "Could not create socket"); this->mark_failed(); @@ -100,9 +100,12 @@ void ESPHomeOTAComponent::handle_() { #endif if (client_ == nullptr) { - struct sockaddr_storage source_addr; - socklen_t addr_len = sizeof(source_addr); - client_ = server_->accept((struct sockaddr *) &source_addr, &addr_len); + // Check if the server socket is ready before accepting + if (this->server_->ready()) { + struct sockaddr_storage source_addr; + socklen_t addr_len = sizeof(source_addr); + client_ = server_->accept((struct sockaddr *) &source_addr, &addr_len); + } } if (client_ == nullptr) return; diff --git a/esphome/components/socket/__init__.py b/esphome/components/socket/__init__.py index 77e8fe51f6..667e30df4b 100644 --- a/esphome/components/socket/__init__.py +++ b/esphome/components/socket/__init__.py @@ -35,5 +35,7 @@ async def to_code(config): cg.add_define("USE_SOCKET_IMPL_LWIP_TCP") elif impl == IMPLEMENTATION_LWIP_SOCKETS: cg.add_define("USE_SOCKET_IMPL_LWIP_SOCKETS") + cg.add_define("USE_SOCKET_SELECT_SUPPORT") elif impl == IMPLEMENTATION_BSD_SOCKETS: cg.add_define("USE_SOCKET_IMPL_BSD_SOCKETS") + cg.add_define("USE_SOCKET_SELECT_SUPPORT") diff --git a/esphome/components/socket/bsd_sockets_impl.cpp b/esphome/components/socket/bsd_sockets_impl.cpp index 1b3916fcab..e056696bcf 100644 --- a/esphome/components/socket/bsd_sockets_impl.cpp +++ b/esphome/components/socket/bsd_sockets_impl.cpp @@ -5,6 +5,7 @@ #ifdef USE_SOCKET_IMPL_BSD_SOCKETS #include +#include "esphome/core/application.h" #ifdef USE_ESP32 #include @@ -40,7 +41,20 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) { class BSDSocketImpl : public Socket { public: - BSDSocketImpl(int fd) : fd_(fd) {} + BSDSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) { +#ifdef USE_SOCKET_SELECT_SUPPORT + // Register new socket with the application for select() if monitoring requested + if (monitor_loop && fd_ >= 0) { + // Only set loop_monitored_ to true if registration succeeds + loop_monitored_ = App.register_socket_fd(fd_); + } else { + loop_monitored_ = false; + } +#else + // Without select support, ignore monitor_loop parameter + (void) monitor_loop; +#endif + } ~BSDSocketImpl() override { if (!closed_) { close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) @@ -48,16 +62,35 @@ class BSDSocketImpl : public Socket { } int connect(const struct sockaddr *addr, socklen_t addrlen) override { return ::connect(fd_, addr, addrlen); } std::unique_ptr accept(struct sockaddr *addr, socklen_t *addrlen) override { + return accept_impl_(addr, addrlen, false); + } + std::unique_ptr accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) override { + return accept_impl_(addr, addrlen, true); + } + + private: + std::unique_ptr accept_impl_(struct sockaddr *addr, socklen_t *addrlen, bool loop_monitored) { int fd = ::accept(fd_, addr, addrlen); if (fd == -1) return {}; - return make_unique(fd); + return make_unique(fd, loop_monitored); } + + public: int bind(const struct sockaddr *addr, socklen_t addrlen) override { return ::bind(fd_, addr, addrlen); } int close() override { - int ret = ::close(fd_); - closed_ = true; - return ret; + if (!closed_) { +#ifdef USE_SOCKET_SELECT_SUPPORT + // Unregister from select() before closing if monitored + if (loop_monitored_) { + App.unregister_socket_fd(fd_); + } +#endif + int ret = ::close(fd_); + closed_ = true; + return ret; + } + return 0; } int shutdown(int how) override { return ::shutdown(fd_, how); } @@ -126,16 +159,27 @@ class BSDSocketImpl : public Socket { return 0; } + int get_fd() const override { return fd_; } + protected: int fd_; bool closed_ = false; }; -std::unique_ptr socket(int domain, int type, int protocol) { +// Helper to create a socket with optional monitoring +static std::unique_ptr create_socket(int domain, int type, int protocol, bool loop_monitored = false) { int ret = ::socket(domain, type, protocol); if (ret == -1) return nullptr; - return std::unique_ptr{new BSDSocketImpl(ret)}; + return std::unique_ptr{new BSDSocketImpl(ret, loop_monitored)}; +} + +std::unique_ptr socket(int domain, int type, int protocol) { + return create_socket(domain, type, protocol, false); +} + +std::unique_ptr socket_loop_monitored(int domain, int type, int protocol) { + return create_socket(domain, type, protocol, true); } } // namespace socket diff --git a/esphome/components/socket/lwip_raw_tcp_impl.cpp b/esphome/components/socket/lwip_raw_tcp_impl.cpp index 1d998902ff..2d64a275df 100644 --- a/esphome/components/socket/lwip_raw_tcp_impl.cpp +++ b/esphome/components/socket/lwip_raw_tcp_impl.cpp @@ -606,6 +606,11 @@ std::unique_ptr socket(int domain, int type, int protocol) { return std::unique_ptr{sock}; } +std::unique_ptr socket_loop_monitored(int domain, int type, int protocol) { + // LWIPRawImpl doesn't use file descriptors, so monitoring is not applicable + return socket(domain, type, protocol); +} + } // namespace socket } // namespace esphome diff --git a/esphome/components/socket/lwip_sockets_impl.cpp b/esphome/components/socket/lwip_sockets_impl.cpp index c41e42fc83..f8a1cbc046 100644 --- a/esphome/components/socket/lwip_sockets_impl.cpp +++ b/esphome/components/socket/lwip_sockets_impl.cpp @@ -5,6 +5,7 @@ #ifdef USE_SOCKET_IMPL_LWIP_SOCKETS #include +#include "esphome/core/application.h" namespace esphome { namespace socket { @@ -33,7 +34,20 @@ std::string format_sockaddr(const struct sockaddr_storage &storage) { class LwIPSocketImpl : public Socket { public: - LwIPSocketImpl(int fd) : fd_(fd) {} + LwIPSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) { +#ifdef USE_SOCKET_SELECT_SUPPORT + // Register new socket with the application for select() if monitoring requested + if (monitor_loop && fd_ >= 0) { + // Only set loop_monitored_ to true if registration succeeds + loop_monitored_ = App.register_socket_fd(fd_); + } else { + loop_monitored_ = false; + } +#else + // Without select support, ignore monitor_loop parameter + (void) monitor_loop; +#endif + } ~LwIPSocketImpl() override { if (!closed_) { close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) @@ -41,16 +55,35 @@ class LwIPSocketImpl : public Socket { } int connect(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_connect(fd_, addr, addrlen); } std::unique_ptr accept(struct sockaddr *addr, socklen_t *addrlen) override { + return accept_impl_(addr, addrlen, false); + } + std::unique_ptr accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) override { + return accept_impl_(addr, addrlen, true); + } + + private: + std::unique_ptr accept_impl_(struct sockaddr *addr, socklen_t *addrlen, bool loop_monitored) { int fd = lwip_accept(fd_, addr, addrlen); if (fd == -1) return {}; - return make_unique(fd); + return make_unique(fd, loop_monitored); } + + public: int bind(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_bind(fd_, addr, addrlen); } int close() override { - int ret = lwip_close(fd_); - closed_ = true; - return ret; + if (!closed_) { +#ifdef USE_SOCKET_SELECT_SUPPORT + // Unregister from select() before closing if monitored + if (loop_monitored_) { + App.unregister_socket_fd(fd_); + } +#endif + int ret = lwip_close(fd_); + closed_ = true; + return ret; + } + return 0; } int shutdown(int how) override { return lwip_shutdown(fd_, how); } @@ -98,16 +131,27 @@ class LwIPSocketImpl : public Socket { return 0; } + int get_fd() const override { return fd_; } + protected: int fd_; bool closed_ = false; }; -std::unique_ptr socket(int domain, int type, int protocol) { +// Helper to create a socket with optional monitoring +static std::unique_ptr create_socket(int domain, int type, int protocol, bool loop_monitored = false) { int ret = lwip_socket(domain, type, protocol); if (ret == -1) return nullptr; - return std::unique_ptr{new LwIPSocketImpl(ret)}; + return std::unique_ptr{new LwIPSocketImpl(ret, loop_monitored)}; +} + +std::unique_ptr socket(int domain, int type, int protocol) { + return create_socket(domain, type, protocol, false); +} + +std::unique_ptr socket_loop_monitored(int domain, int type, int protocol) { + return create_socket(domain, type, protocol, true); } } // namespace socket diff --git a/esphome/components/socket/socket.cpp b/esphome/components/socket/socket.cpp index e260fce05e..1c8e72b8fd 100644 --- a/esphome/components/socket/socket.cpp +++ b/esphome/components/socket/socket.cpp @@ -4,12 +4,35 @@ #include #include #include "esphome/core/log.h" +#include "esphome/core/application.h" namespace esphome { namespace socket { Socket::~Socket() {} +bool Socket::ready() const { +#ifdef USE_SOCKET_SELECT_SUPPORT + if (!loop_monitored_) { + // Non-monitored sockets always return true (assume data may be available) + return true; + } + + // For loop-monitored sockets, check with the Application's select() results + int fd = this->get_fd(); + if (fd < 0) { + // No valid file descriptor, assume ready (fallback behavior) + return true; + } + + return App.is_socket_ready(fd); +#else + // Without select() support, we can't monitor sockets in the loop + // Always return true (assume data may be available) + return true; +#endif +} + std::unique_ptr socket_ip(int type, int protocol) { #if USE_NETWORK_IPV6 return socket(AF_INET6, type, protocol); @@ -18,6 +41,14 @@ std::unique_ptr socket_ip(int type, int protocol) { #endif /* USE_NETWORK_IPV6 */ } +std::unique_ptr socket_ip_loop_monitored(int type, int protocol) { +#if USE_NETWORK_IPV6 + return socket_loop_monitored(AF_INET6, type, protocol); +#else + return socket_loop_monitored(AF_INET, type, protocol); +#endif /* USE_NETWORK_IPV6 */ +} + socklen_t set_sockaddr(struct sockaddr *addr, socklen_t addrlen, const std::string &ip_address, uint16_t port) { #if USE_NETWORK_IPV6 if (ip_address.find(':') != std::string::npos) { diff --git a/esphome/components/socket/socket.h b/esphome/components/socket/socket.h index 917f3c4c7f..8f0d28362e 100644 --- a/esphome/components/socket/socket.h +++ b/esphome/components/socket/socket.h @@ -17,6 +17,11 @@ class Socket { Socket &operator=(const Socket &) = delete; virtual std::unique_ptr accept(struct sockaddr *addr, socklen_t *addrlen) = 0; + /// Accept a connection and monitor it in the main loop + /// NOTE: This function is NOT thread-safe and must only be called from the main loop + virtual std::unique_ptr accept_loop_monitored(struct sockaddr *addr, socklen_t *addrlen) { + return accept(addr, addrlen); // Default implementation for backward compatibility + } virtual int bind(const struct sockaddr *addr, socklen_t addrlen) = 0; virtual int close() = 0; // not supported yet: @@ -44,14 +49,35 @@ class Socket { virtual int setblocking(bool blocking) = 0; virtual int loop() { return 0; }; + + /// Get the underlying file descriptor (returns -1 if not supported) + virtual int get_fd() const { return -1; } + + /// Check if socket has data ready to read + /// For loop-monitored sockets, checks with the Application's select() results + /// For non-monitored sockets, always returns true (assumes data may be available) + bool ready() const; + + protected: +#ifdef USE_SOCKET_SELECT_SUPPORT + bool loop_monitored_{false}; ///< Whether this socket is monitored by the event loop +#endif }; /// Create a socket of the given domain, type and protocol. std::unique_ptr socket(int domain, int type, int protocol); - /// Create a socket in the newest available IP domain (IPv6 or IPv4) of the given type and protocol. std::unique_ptr socket_ip(int type, int protocol); +/// Create a socket and monitor it for data in the main loop. +/// Like socket() but also registers the socket with the Application's select() loop. +/// WARNING: These functions are NOT thread-safe. They must only be called from the main loop +/// as they register the socket file descriptor with the global Application instance. +/// NOTE: On ESP platforms, FD_SETSIZE is typically 10, limiting the number of monitored sockets. +/// File descriptors >= FD_SETSIZE will not be monitored and will log an error. +std::unique_ptr socket_loop_monitored(int domain, int type, int protocol); +std::unique_ptr socket_ip_loop_monitored(int type, int protocol); + /// Set a sockaddr to the specified address and port for the IP version used by socket_ip(). socklen_t set_sockaddr(struct sockaddr *addr, socklen_t addrlen, const std::string &ip_address, uint16_t port); diff --git a/esphome/core/application.cpp b/esphome/core/application.cpp index a71b848499..1cc96265ab 100644 --- a/esphome/core/application.cpp +++ b/esphome/core/application.cpp @@ -2,11 +2,30 @@ #include "esphome/core/log.h" #include "esphome/core/version.h" #include "esphome/core/hal.h" +#include #ifdef USE_STATUS_LED #include "esphome/components/status_led/status_led.h" #endif +#ifdef USE_SOCKET_SELECT_SUPPORT +#include + +#ifdef USE_SOCKET_IMPL_LWIP_SOCKETS +// LWIP sockets implementation +#include +#elif defined(USE_SOCKET_IMPL_BSD_SOCKETS) +// BSD sockets implementation +#ifdef USE_ESP32 +// ESP32 "BSD sockets" are actually LWIP under the hood +#include +#else +// True BSD sockets (e.g., host platform) +#include +#endif +#endif +#endif + namespace esphome { static const char *const TAG = "app"; @@ -106,7 +125,65 @@ void Application::loop() { // otherwise interval=0 schedules result in constant looping with almost no sleep next_schedule = std::max(next_schedule, delay_time / 2); delay_time = std::min(next_schedule, delay_time); + +#ifdef USE_SOCKET_SELECT_SUPPORT + if (!this->socket_fds_.empty()) { + // Use select() with timeout when we have sockets to monitor + + // Update fd_set if socket list has changed + if (this->socket_fds_changed_) { + FD_ZERO(&this->base_read_fds_); + for (int fd : this->socket_fds_) { + if (fd >= 0 && fd < FD_SETSIZE) { + FD_SET(fd, &this->base_read_fds_); + } + } + this->socket_fds_changed_ = false; + } + + // Copy base fd_set before each select + this->read_fds_ = this->base_read_fds_; + + // Convert delay_time (milliseconds) to timeval + struct timeval tv; + tv.tv_sec = delay_time / 1000; + tv.tv_usec = (delay_time - tv.tv_sec * 1000) * 1000; + + // Call select with timeout +#if defined(USE_SOCKET_IMPL_LWIP_SOCKETS) || (defined(USE_ESP32) && defined(USE_SOCKET_IMPL_BSD_SOCKETS)) + // Use lwip_select() on platforms with lwIP - it's faster + // Note: On ESP32 with BSD sockets, select() is already mapped to lwip_select() via macros, + // but we explicitly call lwip_select() for clarity and to ensure we get the optimized version + int ret = lwip_select(this->max_fd_ + 1, &this->read_fds_, nullptr, nullptr, &tv); +#else + // Use standard select() on other platforms (e.g., host/native builds) + int ret = ::select(this->max_fd_ + 1, &this->read_fds_, nullptr, nullptr, &tv); +#endif + + if (ret < 0) { + if (errno == EINTR) { + // Interrupted by signal - this is normal, just continue + // No need to delay as some time has already passed + ESP_LOGVV(TAG, "select() interrupted by signal"); + } else { + // Actual error - log and fall back to delay + ESP_LOGW(TAG, "select() failed with errno %d", errno); + delay(delay_time); + } + } else if (ret > 0) { + ESP_LOGVV(TAG, "select() woke early: %d socket(s) ready (saved up to %ums)", ret, delay_time); + } else { + // ret == 0: timeout occurred (normal) + ESP_LOGVV(TAG, "select() timeout after %ums (no sockets ready)", delay_time); + } + } else { + // No sockets registered, use regular delay + delay(delay_time); + } +#else + // No select support, use regular delay delay(delay_time); +#endif } this->last_loop_ = last_op_end_time; @@ -167,6 +244,67 @@ void Application::calculate_looping_components_() { } } +#ifdef USE_SOCKET_SELECT_SUPPORT +bool Application::register_socket_fd(int fd) { + // WARNING: This function is NOT thread-safe and must only be called from the main loop + // It modifies socket_fds_ and related variables without locking + if (fd < 0) + return false; + + if (fd >= FD_SETSIZE) { + ESP_LOGE(TAG, "Cannot monitor socket fd %d: exceeds FD_SETSIZE (%d)", fd, FD_SETSIZE); + ESP_LOGE(TAG, "Socket will not be monitored for data - may cause performance issues!"); + return false; + } + + this->socket_fds_.push_back(fd); + this->socket_fds_changed_ = true; + + if (fd > this->max_fd_) { + this->max_fd_ = fd; + } + + return true; +} + +void Application::unregister_socket_fd(int fd) { + // WARNING: This function is NOT thread-safe and must only be called from the main loop + // It modifies socket_fds_ and related variables without locking + if (fd < 0) + return; + + auto it = std::find(this->socket_fds_.begin(), this->socket_fds_.end(), fd); + if (it != this->socket_fds_.end()) { + // Swap with last element and pop - O(1) removal since order doesn't matter + if (it != this->socket_fds_.end() - 1) { + std::swap(*it, this->socket_fds_.back()); + } + this->socket_fds_.pop_back(); + this->socket_fds_changed_ = true; + + // Only recalculate max_fd if we removed the current max + if (fd == this->max_fd_) { + if (this->socket_fds_.empty()) { + this->max_fd_ = -1; + } else { + // Find new max using std::max_element + this->max_fd_ = *std::max_element(this->socket_fds_.begin(), this->socket_fds_.end()); + } + } + } +} + +bool Application::is_socket_ready(int fd) const { + // This function is thread-safe for reading the result of select() + // However, it should only be called after select() has been executed in the main loop + // The read_fds_ is only modified by select() in the main loop + if (fd < 0 || fd >= FD_SETSIZE) + return false; + + return FD_ISSET(fd, &this->read_fds_); +} +#endif + Application App; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) } // namespace esphome diff --git a/esphome/core/application.h b/esphome/core/application.h index aa44d9ba1d..c6e6d8b78e 100644 --- a/esphome/core/application.h +++ b/esphome/core/application.h @@ -9,6 +9,10 @@ #include "esphome/core/preferences.h" #include "esphome/core/scheduler.h" +#ifdef USE_SOCKET_SELECT_SUPPORT +#include +#endif + #ifdef USE_BINARY_SENSOR #include "esphome/components/binary_sensor/binary_sensor.h" #endif @@ -467,6 +471,19 @@ class Application { Scheduler scheduler; + /// Register/unregister a socket file descriptor to be monitored for read events. +#ifdef USE_SOCKET_SELECT_SUPPORT + /// These functions update the fd_set used by select() in the main loop. + /// WARNING: These functions are NOT thread-safe. They must only be called from the main loop. + /// NOTE: File descriptors >= FD_SETSIZE (typically 10 on ESP) will be rejected with an error. + /// @return true if registration was successful, false if fd exceeds limits + bool register_socket_fd(int fd); + void unregister_socket_fd(int fd); + /// Check if there's data available on a socket without blocking + /// This function is thread-safe for reading, but should be called after select() has run + bool is_socket_ready(int fd) const; +#endif + protected: friend Component; @@ -555,6 +572,15 @@ class Application { uint32_t app_state_{0}; Component *current_component_{nullptr}; uint32_t loop_component_start_time_{0}; + +#ifdef USE_SOCKET_SELECT_SUPPORT + // Socket select management + std::vector socket_fds_; // Vector of all monitored socket file descriptors + bool socket_fds_changed_{false}; // Flag to rebuild base_read_fds_ when socket_fds_ changes + int max_fd_{-1}; // Highest file descriptor number for select() + fd_set base_read_fds_{}; // Cached fd_set rebuilt only when socket_fds_ changes + fd_set read_fds_{}; // Working fd_set for select(), copied from base_read_fds_ +#endif }; /// Global storage of Application pointer - only one Application can exist. diff --git a/esphome/core/defines.h b/esphome/core/defines.h index 2336ec8fd6..9313f07720 100644 --- a/esphome/core/defines.h +++ b/esphome/core/defines.h @@ -140,6 +140,7 @@ #define USE_MICROPHONE #define USE_PSRAM #define USE_SOCKET_IMPL_BSD_SOCKETS +#define USE_SOCKET_SELECT_SUPPORT #define USE_SPEAKER #define USE_SPI #define USE_VOICE_ASSISTANT @@ -199,12 +200,14 @@ #ifdef USE_LIBRETINY #define USE_CAPTIVE_PORTAL #define USE_SOCKET_IMPL_LWIP_SOCKETS +#define USE_SOCKET_SELECT_SUPPORT #define USE_WEBSERVER #define USE_WEBSERVER_PORT 80 // NOLINT #endif #ifdef USE_HOST #define USE_SOCKET_IMPL_BSD_SOCKETS +#define USE_SOCKET_SELECT_SUPPORT #endif // Disabled feature flags