Merge branch 'integration' of https://github.com/esphome/esphome into integration

This commit is contained in:
J. Nick Koston
2025-06-25 17:30:27 +02:00
17 changed files with 250 additions and 112 deletions

View File

@@ -33,9 +33,14 @@ namespace api {
// Since each message could contain multiple protobuf messages when using packet batching,
// this limits the number of messages processed, not the number of TCP packets.
static constexpr uint8_t MAX_MESSAGES_PER_LOOP = 5;
static constexpr uint8_t MAX_PING_RETRIES = 60;
static constexpr uint16_t PING_RETRY_INTERVAL = 1000;
static constexpr uint32_t KEEPALIVE_DISCONNECT_TIMEOUT = (KEEPALIVE_TIMEOUT_MS * 5) / 2;
static const char *const TAG = "api.connection";
#ifdef USE_ESP32_CAMERA
static const int ESP32_CAMERA_STOP_STREAM = 5000;
#endif
APIConnection::APIConnection(std::unique_ptr<socket::Socket> sock, APIServer *parent)
: parent_(parent), initial_state_iterator_(this), list_entities_iterator_(this) {
@@ -90,16 +95,6 @@ APIConnection::~APIConnection() {
}
void APIConnection::loop() {
if (this->remove_)
return;
if (!network::is_connected()) {
// when network is disconnected force disconnect immediately
// don't wait for timeout
this->on_fatal_error();
ESP_LOGW(TAG, "%s: Network unavailable; disconnecting", this->get_client_combined_info().c_str());
return;
}
if (this->next_close_) {
// requested a disconnect
this->helper_->close();
@@ -152,20 +147,19 @@ void APIConnection::loop() {
// Process deferred batch if scheduled
if (this->deferred_batch_.batch_scheduled &&
App.get_loop_component_start_time() - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) {
now - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) {
this->process_batch_();
}
if (!this->list_entities_iterator_.completed())
if (!this->list_entities_iterator_.completed()) {
this->list_entities_iterator_.advance();
if (!this->initial_state_iterator_.completed() && this->list_entities_iterator_.completed())
} else if (!this->initial_state_iterator_.completed()) {
this->initial_state_iterator_.advance();
}
static uint8_t max_ping_retries = 60;
static uint16_t ping_retry_interval = 1000;
if (this->sent_ping_) {
// Disconnect if not responded within 2.5*keepalive
if (now - this->last_traffic_ > (KEEPALIVE_TIMEOUT_MS * 5) / 2) {
if (now - this->last_traffic_ > KEEPALIVE_DISCONNECT_TIMEOUT) {
on_fatal_error();
ESP_LOGW(TAG, "%s is unresponsive; disconnecting", this->get_client_combined_info().c_str());
}
@@ -173,17 +167,15 @@ void APIConnection::loop() {
ESP_LOGVV(TAG, "Sending keepalive PING");
this->sent_ping_ = this->send_message(PingRequest());
if (!this->sent_ping_) {
this->next_ping_retry_ = now + ping_retry_interval;
this->next_ping_retry_ = now + PING_RETRY_INTERVAL;
this->ping_retries_++;
std::string warn_str = str_sprintf("%s: Sending keepalive failed %u time(s);",
this->get_client_combined_info().c_str(), this->ping_retries_);
if (this->ping_retries_ >= max_ping_retries) {
if (this->ping_retries_ >= MAX_PING_RETRIES) {
on_fatal_error();
ESP_LOGE(TAG, "%s disconnecting", warn_str.c_str());
ESP_LOGE(TAG, "%s: Ping failed %u times", this->get_client_combined_info().c_str(), this->ping_retries_);
} else if (this->ping_retries_ >= 10) {
ESP_LOGW(TAG, "%s retrying in %u ms", warn_str.c_str(), ping_retry_interval);
ESP_LOGW(TAG, "%s: Ping retry %u", this->get_client_combined_info().c_str(), this->ping_retries_);
} else {
ESP_LOGD(TAG, "%s retrying in %u ms", warn_str.c_str(), ping_retry_interval);
ESP_LOGD(TAG, "%s: Ping retry %u", this->get_client_combined_info().c_str(), this->ping_retries_);
}
}
}
@@ -207,22 +199,20 @@ void APIConnection::loop() {
// bool done = 3;
buffer.encode_bool(3, done);
bool success = this->send_buffer(buffer, 44);
bool success = this->send_buffer(buffer, CameraImageResponse::MESSAGE_TYPE);
if (success) {
this->image_reader_.consume_data(to_send);
}
if (success && done) {
this->image_reader_.return_image();
if (done) {
this->image_reader_.return_image();
}
}
}
#endif
if (state_subs_at_ != -1) {
if (state_subs_at_ >= 0) {
const auto &subs = this->parent_->get_state_subs();
if (state_subs_at_ >= (int) subs.size()) {
state_subs_at_ = -1;
} else {
if (state_subs_at_ < static_cast<int>(subs.size())) {
auto &it = subs[state_subs_at_];
SubscribeHomeAssistantStateResponse resp;
resp.entity_id = it.entity_id;
@@ -231,6 +221,8 @@ void APIConnection::loop() {
if (this->send_message(resp)) {
state_subs_at_++;
}
} else {
state_subs_at_ = -1;
}
}
}

View File

@@ -66,6 +66,17 @@ const char *api_error_to_str(APIError err) {
return "UNKNOWN";
}
// Default implementation for loop - handles sending buffered data
APIError APIFrameHelper::loop() {
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
}
// Helper method to buffer data from IOVs
void APIFrameHelper::buffer_data_from_iov_(const struct iovec *iov, int iovcnt, uint16_t total_write_len) {
SendBuffer buffer;
@@ -287,13 +298,8 @@ APIError APINoiseFrameHelper::loop() {
}
}
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
// Use base class implementation for buffer sending
return APIFrameHelper::loop();
}
/** Read a packet into the rx_buf_. If successful, stores frame data in the frame parameter
@@ -339,17 +345,15 @@ APIError APINoiseFrameHelper::try_read_frame_(ParsedFrame *frame) {
return APIError::WOULD_BLOCK;
}
if (rx_header_buf_[0] != 0x01) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]);
return APIError::BAD_INDICATOR;
}
// header reading done
}
// read body
uint8_t indicator = rx_header_buf_[0];
if (indicator != 0x01) {
state_ = State::FAILED;
HELPER_LOG("Bad indicator byte %u", indicator);
return APIError::BAD_INDICATOR;
}
uint16_t msg_size = (((uint16_t) rx_header_buf_[1]) << 8) | rx_header_buf_[2];
if (state_ != State::DATA && msg_size > 128) {
@@ -595,10 +599,6 @@ APIError APINoiseFrameHelper::read_packet(ReadPacketBuffer *buffer) {
return APIError::BAD_DATA_PACKET;
}
// uint16_t type;
// uint16_t data_len;
// uint8_t *data;
// uint8_t *padding; zero or more bytes to fill up the rest of the packet
uint16_t type = (((uint16_t) msg_data[0]) << 8) | msg_data[1];
uint16_t data_len = (((uint16_t) msg_data[2]) << 8) | msg_data[3];
if (data_len > msg_size - 4) {
@@ -831,18 +831,12 @@ APIError APIPlaintextFrameHelper::init() {
state_ = State::DATA;
return APIError::OK;
}
/// Not used for plaintext
APIError APIPlaintextFrameHelper::loop() {
if (state_ != State::DATA) {
return APIError::BAD_STATE;
}
if (!this->tx_buf_.empty()) {
APIError err = try_send_tx_buf_();
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
return err;
}
}
return APIError::OK; // Convert WOULD_BLOCK to OK to avoid connection termination
// Use base class implementation for buffer sending
return APIFrameHelper::loop();
}
/** Read a packet into the rx_buf_. If successful, stores frame data in the frame parameter

View File

@@ -38,7 +38,7 @@ struct PacketInfo {
: message_type(type), offset(off), payload_size(size), padding(0) {}
};
enum class APIError : int {
enum class APIError : uint16_t {
OK = 0,
WOULD_BLOCK = 1001,
BAD_HANDSHAKE_PACKET_LEN = 1002,
@@ -74,7 +74,7 @@ class APIFrameHelper {
}
virtual ~APIFrameHelper() = default;
virtual APIError init() = 0;
virtual APIError loop() = 0;
virtual APIError loop();
virtual APIError read_packet(ReadPacketBuffer *buffer) = 0;
bool can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); }
std::string getpeername() { return socket_->getpeername(); }

View File

@@ -47,6 +47,11 @@ void APIServer::setup() {
}
#endif
// Schedule reboot if no clients connect within timeout
if (this->reboot_timeout_ != 0) {
this->schedule_reboot_timeout_();
}
this->socket_ = socket::socket_ip_loop_monitored(SOCK_STREAM, 0); // monitored for incoming connections
if (this->socket_ == nullptr) {
ESP_LOGW(TAG, "Could not create socket");
@@ -106,8 +111,6 @@ void APIServer::setup() {
}
#endif
this->last_connected_ = App.get_loop_component_start_time();
#ifdef USE_ESP32_CAMERA
if (esp32_camera::global_esp32_camera != nullptr && !esp32_camera::global_esp32_camera->is_internal()) {
esp32_camera::global_esp32_camera->add_image_callback(
@@ -121,6 +124,16 @@ void APIServer::setup() {
#endif
}
void APIServer::schedule_reboot_timeout_() {
this->status_set_warning();
this->set_timeout("api_reboot", this->reboot_timeout_, []() {
if (!global_api_server->is_connected()) {
ESP_LOGE(TAG, "No clients; rebooting");
App.reboot();
}
});
}
void APIServer::loop() {
// Accept new clients only if the socket exists and has incoming connections
if (this->socket_ && this->socket_->ready()) {
@@ -130,51 +143,61 @@ void APIServer::loop() {
auto sock = this->socket_->accept_loop_monitored((struct sockaddr *) &source_addr, &addr_len);
if (!sock)
break;
ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str());
ESP_LOGD(TAG, "Accept %s", sock->getpeername().c_str());
auto *conn = new APIConnection(std::move(sock), this);
this->clients_.emplace_back(conn);
conn->start();
// Clear warning status and cancel reboot when first client connects
if (this->clients_.size() == 1 && this->reboot_timeout_ != 0) {
this->status_clear_warning();
this->cancel_timeout("api_reboot");
}
}
}
if (this->clients_.empty()) {
return;
}
// Process clients and remove disconnected ones in a single pass
if (!this->clients_.empty()) {
size_t client_index = 0;
while (client_index < this->clients_.size()) {
auto &client = this->clients_[client_index];
if (client->remove_) {
// Handle disconnection
this->client_disconnected_trigger_->trigger(client->client_info_, client->client_peername_);
ESP_LOGV(TAG, "Removing connection to %s", client->client_info_.c_str());
// Swap with the last element and pop (avoids expensive vector shifts)
if (client_index < this->clients_.size() - 1) {
std::swap(this->clients_[client_index], this->clients_.back());
}
this->clients_.pop_back();
// Don't increment client_index since we need to process the swapped element
} else {
// Process active client
client->loop();
client_index++; // Move to next client
}
// Check network connectivity once for all clients
if (!network::is_connected()) {
// Network is down - disconnect all clients
for (auto &client : this->clients_) {
client->on_fatal_error();
ESP_LOGW(TAG, "%s: Network down; disconnect", client->get_client_combined_info().c_str());
}
// Continue to process and clean up the clients below
}
if (this->reboot_timeout_ != 0) {
const uint32_t now = App.get_loop_component_start_time();
if (!this->is_connected()) {
if (now - this->last_connected_ > this->reboot_timeout_) {
ESP_LOGE(TAG, "No client connected; rebooting");
App.reboot();
}
this->status_set_warning();
} else {
this->last_connected_ = now;
this->status_clear_warning();
size_t client_index = 0;
while (client_index < this->clients_.size()) {
auto &client = this->clients_[client_index];
if (!client->remove_) {
// Common case: process active client
client->loop();
client_index++;
continue;
}
// Rare case: handle disconnection
this->client_disconnected_trigger_->trigger(client->client_info_, client->client_peername_);
ESP_LOGV(TAG, "Remove connection %s", client->client_info_.c_str());
// Swap with the last element and pop (avoids expensive vector shifts)
if (client_index < this->clients_.size() - 1) {
std::swap(this->clients_[client_index], this->clients_.back());
}
this->clients_.pop_back();
// Schedule reboot when last client disconnects
if (this->clients_.empty() && this->reboot_timeout_ != 0) {
this->schedule_reboot_timeout_();
}
// Don't increment client_index since we need to process the swapped element
}
}

View File

@@ -142,6 +142,7 @@ class APIServer : public Component, public Controller {
}
protected:
void schedule_reboot_timeout_();
// Pointers and pointer-like types first (4 bytes each)
std::unique_ptr<socket::Socket> socket_ = nullptr;
Trigger<std::string, std::string> *client_connected_trigger_ = new Trigger<std::string, std::string>();
@@ -150,7 +151,6 @@ class APIServer : public Component, public Controller {
// 4-byte aligned types
uint32_t reboot_timeout_{300000};
uint32_t batch_delay_{100};
uint32_t last_connected_{0};
// Vectors and strings (12 bytes each on 32-bit)
std::vector<std::unique_ptr<APIConnection>> clients_;

View File

@@ -48,6 +48,11 @@ void HOT Logger::log_vprintf_(uint8_t level, const char *tag, int line, const ch
// For non-main tasks, queue the message for callbacks - but only if we have any callbacks registered
message_sent =
this->log_buffer_->send_message_thread_safe(level, tag, static_cast<uint16_t>(line), current_task, format, args);
if (message_sent) {
// Enable logger loop to process the buffered message
// This is safe to call from any context including ISRs
this->enable_loop_soon_any_context();
}
#endif // USE_ESPHOME_TASK_LOG_BUFFER
// Emergency console logging for non-main tasks when ring buffer is full or disabled
@@ -139,6 +144,10 @@ Logger::Logger(uint32_t baud_rate, size_t tx_buffer_size) : baud_rate_(baud_rate
#ifdef USE_ESPHOME_TASK_LOG_BUFFER
void Logger::init_log_buffer(size_t total_buffer_size) {
this->log_buffer_ = esphome::make_unique<logger::TaskLogBuffer>(total_buffer_size);
// Start with loop disabled when using task buffer (unless using USB CDC)
// The loop will be enabled automatically when messages arrive
this->disable_loop_when_buffer_empty_();
}
#endif
@@ -189,6 +198,10 @@ void Logger::loop() {
this->write_msg_(this->tx_buffer_);
}
}
} else {
// No messages to process, disable loop if appropriate
// This reduces overhead when there's no async logging activity
this->disable_loop_when_buffer_empty_();
}
#endif
}

View File

@@ -358,6 +358,26 @@ class Logger : public Component {
static const uint16_t RESET_COLOR_LEN = strlen(ESPHOME_LOG_RESET_COLOR);
this->write_body_to_buffer_(ESPHOME_LOG_RESET_COLOR, RESET_COLOR_LEN, buffer, buffer_at, buffer_size);
}
#ifdef USE_ESP32
// Disable loop when task buffer is empty (with USB CDC check)
inline void disable_loop_when_buffer_empty_() {
// Thread safety note: This is safe even if another task calls enable_loop_soon_any_context()
// concurrently. If that happens between our check and disable_loop(), the enable request
// will be processed on the next main loop iteration since:
// - disable_loop() takes effect immediately
// - enable_loop_soon_any_context() sets a pending flag that's checked at loop start
#if defined(USE_LOGGER_USB_CDC) && defined(USE_ARDUINO)
// Only disable if not using USB CDC (which needs loop for connection detection)
if (this->uart_ != UART_SELECTION_USB_CDC) {
this->disable_loop();
}
#else
// No USB CDC support, always safe to disable
this->disable_loop();
#endif
}
#endif
};
extern Logger *global_logger; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

View File

@@ -23,16 +23,22 @@ std::string state_class_to_string(StateClass state_class) {
Sensor::Sensor() : state(NAN), raw_state(NAN) {}
int8_t Sensor::get_accuracy_decimals() {
if (this->accuracy_decimals_.has_value())
return *this->accuracy_decimals_;
if (this->sensor_flags_.has_accuracy_override)
return this->accuracy_decimals_;
return 0;
}
void Sensor::set_accuracy_decimals(int8_t accuracy_decimals) { this->accuracy_decimals_ = accuracy_decimals; }
void Sensor::set_accuracy_decimals(int8_t accuracy_decimals) {
this->accuracy_decimals_ = accuracy_decimals;
this->sensor_flags_.has_accuracy_override = true;
}
void Sensor::set_state_class(StateClass state_class) { this->state_class_ = state_class; }
void Sensor::set_state_class(StateClass state_class) {
this->state_class_ = state_class;
this->sensor_flags_.has_state_class_override = true;
}
StateClass Sensor::get_state_class() {
if (this->state_class_.has_value())
return *this->state_class_;
if (this->sensor_flags_.has_state_class_override)
return this->state_class_;
return StateClass::STATE_CLASS_NONE;
}

View File

@@ -80,9 +80,9 @@ class Sensor : public EntityBase, public EntityBase_DeviceClass, public EntityBa
* state changes to the database when they are published, even if the state is the
* same as before.
*/
bool get_force_update() const { return force_update_; }
bool get_force_update() const { return sensor_flags_.force_update; }
/// Set force update mode.
void set_force_update(bool force_update) { force_update_ = force_update; }
void set_force_update(bool force_update) { sensor_flags_.force_update = force_update; }
/// Add a filter to the filter chain. Will be appended to the back.
void add_filter(Filter *filter);
@@ -155,9 +155,17 @@ class Sensor : public EntityBase, public EntityBase_DeviceClass, public EntityBa
Filter *filter_list_{nullptr}; ///< Store all active filters.
optional<int8_t> accuracy_decimals_; ///< Accuracy in decimals override
optional<StateClass> state_class_{STATE_CLASS_NONE}; ///< State class override
bool force_update_{false}; ///< Force update mode
// Group small members together to avoid padding
int8_t accuracy_decimals_{-1}; ///< Accuracy in decimals (-1 = not set)
StateClass state_class_{STATE_CLASS_NONE}; ///< State class (STATE_CLASS_NONE = not set)
// Bit-packed flags for sensor-specific settings
struct SensorFlags {
uint8_t has_accuracy_override : 1;
uint8_t has_state_class_override : 1;
uint8_t force_update : 1;
uint8_t reserved : 5; // Reserved for future use
} sensor_flags_{};
};
} // namespace sensor

View File

@@ -136,6 +136,10 @@ void Application::loop() {
this->in_loop_ = false;
this->app_state_ = new_app_state;
// Process any pending runtime stats printing after all components have run
// This ensures stats printing doesn't affect component timing measurements
runtime_stats.process_pending_stats(last_op_end_time);
// Use the last component's end time instead of calling millis() again
auto elapsed = last_op_end_time - this->last_loop_;
if (elapsed >= this->loop_interval_ || HighFrequencyLoopRequester::is_high_frequency()) {

View File

@@ -132,6 +132,8 @@
// ESP32-specific feature flags
#ifdef USE_ESP32
#define USE_ESPHOME_TASK_LOG_BUFFER
#define USE_BLUETOOTH_PROXY
#define USE_CAPTIVE_PORTAL
#define USE_ESP32_BLE

View File

@@ -28,11 +28,7 @@ void RuntimeStatsCollector::record_component_time(Component *component, uint32_t
return;
}
if (current_time >= this->next_log_time_) {
this->log_stats_();
this->reset_stats_();
this->next_log_time_ = current_time + this->log_interval_;
}
// Don't print stats here anymore - let process_pending_stats handle it
}
void RuntimeStatsCollector::log_stats_() {
@@ -82,4 +78,15 @@ void RuntimeStatsCollector::log_stats_() {
}
}
} // namespace esphome
void RuntimeStatsCollector::process_pending_stats(uint32_t current_time) {
if (!this->enabled_ || this->next_log_time_ == 0)
return;
if (current_time >= this->next_log_time_) {
this->log_stats_();
this->reset_stats_();
this->next_log_time_ = current_time + this->log_interval_;
}
}
} // namespace esphome

View File

@@ -95,6 +95,9 @@ class RuntimeStatsCollector {
void record_component_time(Component *component, uint32_t duration_ms, uint32_t current_time);
// Process any pending stats printing (should be called after component loop)
void process_pending_stats(uint32_t current_time);
protected:
void log_stats_();

View File

@@ -0,0 +1,7 @@
esphome:
name: api-reboot-test
host:
api:
reboot_timeout: 0.5s # Very short timeout for fast testing
logger:
level: DEBUG

View File

@@ -8,5 +8,8 @@ sensor:
name: Test Sensor
id: test_sensor
unit_of_measurement: °C
accuracy_decimals: 2
state_class: measurement
force_update: true
lambda: return 42.0;
update_interval: 0.1s

View File

@@ -0,0 +1,35 @@
"""Test API server reboot timeout functionality."""
import asyncio
import re
import pytest
from .types import RunCompiledFunction
@pytest.mark.asyncio
async def test_api_reboot_timeout(
yaml_config: str,
run_compiled: RunCompiledFunction,
) -> None:
"""Test that the device reboots when no API clients connect within the timeout."""
loop = asyncio.get_running_loop()
reboot_future = loop.create_future()
reboot_pattern = re.compile(r"No clients; rebooting")
def check_output(line: str) -> None:
"""Check output for reboot message."""
if not reboot_future.done() and reboot_pattern.search(line):
reboot_future.set_result(True)
# Run the device without connecting any API client
async with run_compiled(yaml_config, line_callback=check_output):
# Wait for reboot with timeout
# (0.5s reboot timeout + some margin for processing)
try:
await asyncio.wait_for(reboot_future, timeout=2.0)
except asyncio.TimeoutError:
pytest.fail("Device did not reboot within expected timeout")
# Test passes if we get here - reboot was detected

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import aioesphomeapi
from aioesphomeapi import EntityState
import pytest
@@ -47,3 +48,23 @@ async def test_host_mode_with_sensor(
# Verify the sensor state
assert test_sensor_state.state == 42.0
assert len(states) > 0, "No states received"
# Verify the optimized fields are working correctly
# Get entity info to check accuracy_decimals, state_class, etc.
entities, _ = await client.list_entities_services()
sensor_info: aioesphomeapi.SensorInfo | None = None
for entity in entities:
if isinstance(entity, aioesphomeapi.SensorInfo):
sensor_info = entity
break
assert sensor_info is not None, "Sensor entity info not found"
assert sensor_info.accuracy_decimals == 2, (
f"Expected accuracy_decimals=2, got {sensor_info.accuracy_decimals}"
)
assert sensor_info.state_class == 1, (
f"Expected state_class=1 (measurement), got {sensor_info.state_class}"
)
assert sensor_info.force_update is True, (
f"Expected force_update=True, got {sensor_info.force_update}"
)