Merge remote-tracking branch 'upstream/dev' into add_api_stats

This commit is contained in:
J. Nick Koston 2025-05-27 09:10:13 -05:00
commit c6858163a7
No known key found for this signature in database
57 changed files with 1053 additions and 78 deletions

View File

@ -1,2 +1,4 @@
[run]
omit = esphome/components/*
omit =
esphome/components/*
tests/integration/*

View File

@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Set up Python
uses: actions/setup-python@v5.6.0
with:

View File

@ -43,7 +43,7 @@ jobs:
- "docker"
# - "lint"
steps:
- uses: actions/checkout@v4.1.7
- uses: actions/checkout@v4.2.2
- name: Set up Python
uses: actions/setup-python@v5.6.0
with:

View File

@ -36,7 +36,7 @@ jobs:
cache-key: ${{ steps.cache-key.outputs.key }}
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Generate cache-key
id: cache-key
run: echo key="${{ hashFiles('requirements.txt', 'requirements_test.txt') }}" >> $GITHUB_OUTPUT
@ -68,7 +68,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -89,7 +89,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -110,7 +110,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -131,7 +131,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -152,7 +152,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -202,7 +202,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -214,12 +214,12 @@ jobs:
if: matrix.os == 'windows-latest'
run: |
./venv/Scripts/activate
pytest -vv --cov-report=xml --tb=native tests
pytest -vv --cov-report=xml --tb=native -n auto tests
- name: Run pytest
if: matrix.os == 'ubuntu-latest' || matrix.os == 'macOS-latest'
run: |
. venv/bin/activate
pytest -vv --cov-report=xml --tb=native tests
pytest -vv --cov-report=xml --tb=native -n auto tests
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5.4.3
with:
@ -232,7 +232,7 @@ jobs:
- common
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -300,7 +300,7 @@ jobs:
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -356,7 +356,7 @@ jobs:
count: ${{ steps.list-components.outputs.count }}
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
with:
# Fetch enough history so `git merge-base refs/remotes/origin/dev HEAD` works.
fetch-depth: 500
@ -406,7 +406,7 @@ jobs:
sudo apt-get install libsdl2-dev
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:
@ -432,7 +432,7 @@ jobs:
matrix: ${{ steps.split.outputs.components }}
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Split components into 20 groups
id: split
run: |
@ -462,7 +462,7 @@ jobs:
sudo apt-get install libsdl2-dev
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Restore Python
uses: ./.github/actions/restore-python
with:

View File

@ -20,7 +20,7 @@ jobs:
branch_build: ${{ steps.tag.outputs.branch_build }}
deploy_env: ${{ steps.tag.outputs.deploy_env }}
steps:
- uses: actions/checkout@v4.1.7
- uses: actions/checkout@v4.2.2
- name: Get tag
id: tag
# yamllint disable rule:line-length
@ -60,7 +60,7 @@ jobs:
contents: read
id-token: write
steps:
- uses: actions/checkout@v4.1.7
- uses: actions/checkout@v4.2.2
- name: Set up Python
uses: actions/setup-python@v5.6.0
with:
@ -92,7 +92,7 @@ jobs:
os: "ubuntu-24.04-arm"
steps:
- uses: actions/checkout@v4.1.7
- uses: actions/checkout@v4.2.2
- name: Set up Python
uses: actions/setup-python@v5.6.0
with:
@ -168,7 +168,7 @@ jobs:
- ghcr
- dockerhub
steps:
- uses: actions/checkout@v4.1.7
- uses: actions/checkout@v4.2.2
- name: Download digests
uses: actions/download-artifact@v4.3.0

View File

@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out code from GitHub
uses: actions/checkout@v4.1.7
uses: actions/checkout@v4.2.2
- name: Run yamllint
uses: frenck/action-yamllint@v1.5.0
with:

View File

@ -28,7 +28,7 @@ repos:
- --branch=release
- --branch=beta
- repo: https://github.com/asottile/pyupgrade
rev: v3.19.1
rev: v3.20.0
hooks:
- id: pyupgrade
args: [--py310-plus]

View File

@ -7,11 +7,11 @@
#include <map>
#include <string>
#include "esphome/components/network/util.h"
#include "esphome/core/application.h"
#include "esphome/core/entity_base.h"
#include "esphome/core/hal.h"
#include "esphome/core/log.h"
#include "esphome/core/version.h"
#include "esphome/core/application.h"
#ifdef USE_DEEP_SLEEP
#include "esphome/components/deep_sleep/deep_sleep_component.h"
@ -179,7 +179,11 @@ void APIConnection::loop() {
// Section: Process Message
start_time = millis();
this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]);
if (buffer.data_len > 0) {
this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]);
} else {
this->read_message(0, buffer.type, nullptr);
}
duration = millis() - start_time;
this->section_stats_["process_message"].record_time(duration);

View File

@ -9,6 +9,7 @@ from esphome.const import (
CONF_ID,
CONF_LINE_FREQUENCY,
CONF_POWER,
CONF_RESET,
CONF_VOLTAGE,
DEVICE_CLASS_CURRENT,
DEVICE_CLASS_ENERGY,
@ -27,7 +28,6 @@ from esphome.const import (
CONF_CURRENT_REFERENCE = "current_reference"
CONF_ENERGY_REFERENCE = "energy_reference"
CONF_POWER_REFERENCE = "power_reference"
CONF_RESET = "reset"
CONF_VOLTAGE_REFERENCE = "voltage_reference"
DEPENDENCIES = ["uart"]

View File

@ -16,7 +16,7 @@ CODEOWNERS = ["@neffs", "@kbx81"]
DOMAIN = "bme68x_bsec2"
BSEC2_LIBRARY_VERSION = "v1.8.2610"
BSEC2_LIBRARY_VERSION = "1.10.2610"
CONF_ALGORITHM_OUTPUT = "algorithm_output"
CONF_BME68X_BSEC2_ID = "bme68x_bsec2_id"
@ -145,7 +145,6 @@ CONFIG_SCHEMA_BASE = (
): cv.positive_time_period_minutes,
},
)
.add_extra(cv.only_with_arduino)
.add_extra(validate_bme68x)
.add_extra(download_bme68x_blob)
)
@ -179,11 +178,13 @@ async def to_code_base(config):
bsec2_arr = cg.progmem_array(config[CONF_RAW_DATA_ID], rhs)
cg.add(var.set_bsec2_configuration(bsec2_arr, len(rhs)))
# Although this component does not use SPI, the BSEC2 library requires the SPI library
cg.add_library("SPI", None)
# Although this component does not use SPI, the BSEC2 Arduino library requires the SPI library
if core.CORE.using_arduino:
cg.add_library("SPI", None)
cg.add_library(
"BME68x Sensor library",
"1.1.40407",
"1.3.40408",
"https://github.com/boschsensortec/Bosch-BME68x-Library",
)
cg.add_library(
"BSEC2 Software Library",

View File

@ -1,4 +1,5 @@
#include "esphome/core/defines.h"
#include "esphome/core/hal.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"

View File

@ -1,4 +1,5 @@
#include "esphome/core/defines.h"
#include "esphome/core/hal.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"

View File

@ -57,6 +57,7 @@ from .const import ( # noqa
VARIANT_ESP32,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
@ -88,6 +89,7 @@ CPU_FREQUENCIES = {
VARIANT_ESP32S3: get_cpu_frequencies(80, 160, 240),
VARIANT_ESP32C2: get_cpu_frequencies(80, 120),
VARIANT_ESP32C3: get_cpu_frequencies(80, 160),
VARIANT_ESP32C5: get_cpu_frequencies(80, 160, 240),
VARIANT_ESP32C6: get_cpu_frequencies(80, 120, 160),
VARIANT_ESP32H2: get_cpu_frequencies(16, 32, 48, 64, 96),
VARIANT_ESP32P4: get_cpu_frequencies(40, 360, 400),

View File

@ -2,6 +2,7 @@ from .const import (
VARIANT_ESP32,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
@ -1593,6 +1594,10 @@ BOARDS = {
"name": "Ai-Thinker ESP-C3-M1-I-Kit",
"variant": VARIANT_ESP32C3,
},
"esp32-c5-devkitc-1": {
"name": "Espressif ESP32-C5-DevKitC-1",
"variant": VARIANT_ESP32C5,
},
"esp32-c6-devkitc-1": {
"name": "Espressif ESP32-C6-DevKitC-1",
"variant": VARIANT_ESP32C6,

View File

@ -17,6 +17,7 @@ VARIANT_ESP32S2 = "ESP32S2"
VARIANT_ESP32S3 = "ESP32S3"
VARIANT_ESP32C2 = "ESP32C2"
VARIANT_ESP32C3 = "ESP32C3"
VARIANT_ESP32C5 = "ESP32C5"
VARIANT_ESP32C6 = "ESP32C6"
VARIANT_ESP32H2 = "ESP32H2"
VARIANT_ESP32P4 = "ESP32P4"
@ -26,6 +27,7 @@ VARIANTS = [
VARIANT_ESP32S3,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
@ -37,6 +39,7 @@ VARIANT_FRIENDLY = {
VARIANT_ESP32S3: "ESP32-S3",
VARIANT_ESP32C2: "ESP32-C2",
VARIANT_ESP32C3: "ESP32-C3",
VARIANT_ESP32C5: "ESP32-C5",
VARIANT_ESP32C6: "ESP32-C6",
VARIANT_ESP32H2: "ESP32-H2",
VARIANT_ESP32P4: "ESP32-P4",

View File

@ -15,8 +15,9 @@
#ifdef USE_ARDUINO
#include <Esp.h>
#else
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
#include <esp_clk_tree.h>
#endif
void setup();
void loop();
#endif
@ -63,7 +64,13 @@ uint32_t arch_get_cpu_cycle_count() { return cpu_hal_get_cycle_count(); }
uint32_t arch_get_cpu_freq_hz() {
uint32_t freq = 0;
#ifdef USE_ESP_IDF
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
esp_clk_tree_src_get_freq_hz(SOC_MOD_CLK_CPU, ESP_CLK_TREE_SRC_FREQ_PRECISION_CACHED, &freq);
#else
rtc_cpu_freq_config_t config;
rtc_clk_cpu_freq_get_config(&config);
freq = config.freq_mhz * 1000000U;
#endif
#elif defined(USE_ARDUINO)
freq = ESP.getCpuFreqMHz() * 1000000;
#endif

View File

@ -27,6 +27,7 @@ from .const import (
VARIANT_ESP32,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
@ -37,6 +38,7 @@ from .const import (
from .gpio_esp32 import esp32_validate_gpio_pin, esp32_validate_supports
from .gpio_esp32_c2 import esp32_c2_validate_gpio_pin, esp32_c2_validate_supports
from .gpio_esp32_c3 import esp32_c3_validate_gpio_pin, esp32_c3_validate_supports
from .gpio_esp32_c5 import esp32_c5_validate_gpio_pin, esp32_c5_validate_supports
from .gpio_esp32_c6 import esp32_c6_validate_gpio_pin, esp32_c6_validate_supports
from .gpio_esp32_h2 import esp32_h2_validate_gpio_pin, esp32_h2_validate_supports
from .gpio_esp32_p4 import esp32_p4_validate_gpio_pin, esp32_p4_validate_supports
@ -100,6 +102,10 @@ _esp32_validations = {
pin_validation=esp32_c3_validate_gpio_pin,
usage_validation=esp32_c3_validate_supports,
),
VARIANT_ESP32C5: ESP32ValidationFunctions(
pin_validation=esp32_c5_validate_gpio_pin,
usage_validation=esp32_c5_validate_supports,
),
VARIANT_ESP32C6: ESP32ValidationFunctions(
pin_validation=esp32_c6_validate_gpio_pin,
usage_validation=esp32_c6_validate_supports,

View File

@ -0,0 +1,45 @@
import logging
import esphome.config_validation as cv
from esphome.const import CONF_INPUT, CONF_MODE, CONF_NUMBER
from esphome.pins import check_strapping_pin
_ESP32C5_SPI_PSRAM_PINS = {
16: "SPICS0",
17: "SPIQ",
18: "SPIWP",
19: "VDD_SPI",
20: "SPIHD",
21: "SPICLK",
22: "SPID",
}
_ESP32C5_STRAPPING_PINS = {2, 7, 27, 28}
_LOGGER = logging.getLogger(__name__)
def esp32_c5_validate_gpio_pin(value):
if value < 0 or value > 28:
raise cv.Invalid(f"Invalid pin number: {value} (must be 0-28)")
if value in _ESP32C5_SPI_PSRAM_PINS:
raise cv.Invalid(
f"This pin cannot be used on ESP32-C5s and is already used by the SPI/PSRAM interface (function: {_ESP32C5_SPI_PSRAM_PINS[value]})"
)
return value
def esp32_c5_validate_supports(value):
num = value[CONF_NUMBER]
mode = value[CONF_MODE]
is_input = mode[CONF_INPUT]
if num < 0 or num > 28:
raise cv.Invalid(f"Invalid pin number: {num} (must be 0-28)")
if is_input:
# All ESP32 pins support input mode
pass
check_strapping_pin(value, _ESP32C5_STRAPPING_PINS, _LOGGER)
return value

View File

@ -4,6 +4,7 @@ from esphome.components.esp32 import get_esp32_variant
from esphome.components.esp32.const import (
VARIANT_ESP32,
VARIANT_ESP32C3,
VARIANT_ESP32P4,
VARIANT_ESP32S2,
VARIANT_ESP32S3,
)
@ -74,6 +75,7 @@ I2S_PORTS = {
VARIANT_ESP32S2: 1,
VARIANT_ESP32S3: 2,
VARIANT_ESP32C3: 1,
VARIANT_ESP32P4: 3,
}
i2s_channel_fmt_t = cg.global_ns.enum("i2s_channel_fmt_t")

View File

@ -8,6 +8,7 @@ from esphome.components.esp32.const import (
VARIANT_ESP32,
VARIANT_ESP32C2,
VARIANT_ESP32C3,
VARIANT_ESP32C5,
VARIANT_ESP32C6,
VARIANT_ESP32H2,
VARIANT_ESP32P4,
@ -89,6 +90,7 @@ UART_SELECTION_ESP32 = {
VARIANT_ESP32S3: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
VARIANT_ESP32C3: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
VARIANT_ESP32C2: [UART0, UART1],
VARIANT_ESP32C5: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
VARIANT_ESP32C6: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
VARIANT_ESP32H2: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
VARIANT_ESP32P4: [UART0, UART1, USB_CDC, USB_SERIAL_JTAG],
@ -207,6 +209,7 @@ CONFIG_SCHEMA = cv.All(
esp32_s3_idf=USB_SERIAL_JTAG,
esp32_c3_arduino=USB_CDC,
esp32_c3_idf=USB_SERIAL_JTAG,
esp32_c5_idf=USB_SERIAL_JTAG,
esp32_c6_arduino=USB_CDC,
esp32_c6_idf=USB_SERIAL_JTAG,
esp32_p4_idf=USB_SERIAL_JTAG,

View File

@ -1,7 +1,7 @@
import esphome.codegen as cg
from esphome.components import switch
import esphome.config_validation as cv
from esphome.const import CONF_ADDRESS, CONF_ID
from esphome.const import CONF_ADDRESS, CONF_ASSUMED_STATE, CONF_ID
from .. import (
MODBUS_REGISTER_TYPE,
@ -36,6 +36,7 @@ CONFIG_SCHEMA = cv.All(
.extend(ModbusItemBaseSchema)
.extend(
{
cv.Optional(CONF_ASSUMED_STATE, default=False): cv.boolean,
cv.Optional(CONF_REGISTER_TYPE): cv.enum(MODBUS_REGISTER_TYPE),
cv.Optional(CONF_USE_WRITE_MULTIPLE, default=False): cv.boolean,
cv.Optional(CONF_WRITE_LAMBDA): cv.returning_lambda,
@ -62,7 +63,10 @@ async def to_code(config):
paren = await cg.get_variable(config[CONF_MODBUS_CONTROLLER_ID])
cg.add(var.set_parent(paren))
cg.add(var.set_use_write_mutiple(config[CONF_USE_WRITE_MULTIPLE]))
cg.add(paren.add_sensor_item(var))
assumed_state = config[CONF_ASSUMED_STATE]
cg.add(var.set_assumed_state(assumed_state))
if not assumed_state:
cg.add(paren.add_sensor_item(var))
if CONF_WRITE_LAMBDA in config:
template_ = await cg.process_lambda(
config[CONF_WRITE_LAMBDA],

View File

@ -19,6 +19,10 @@ void ModbusSwitch::setup() {
}
void ModbusSwitch::dump_config() { LOG_SWITCH(TAG, "Modbus Controller Switch", this); }
void ModbusSwitch::set_assumed_state(bool assumed_state) { this->assumed_state_ = assumed_state; }
bool ModbusSwitch::assumed_state() { return this->assumed_state_; }
void ModbusSwitch::parse_and_publish(const std::vector<uint8_t> &data) {
bool value = false;
switch (this->register_type) {

View File

@ -29,6 +29,7 @@ class ModbusSwitch : public Component, public switch_::Switch, public SensorItem
void setup() override;
void write_state(bool state) override;
void dump_config() override;
void set_assumed_state(bool assumed_state);
void set_state(bool state) { this->state = state; }
void parse_and_publish(const std::vector<uint8_t> &data) override;
void set_parent(ModbusController *parent) { this->parent_ = parent; }
@ -40,10 +41,12 @@ class ModbusSwitch : public Component, public switch_::Switch, public SensorItem
void set_use_write_mutiple(bool use_write_multiple) { this->use_write_multiple_ = use_write_multiple; }
protected:
bool assumed_state() override;
ModbusController *parent_{nullptr};
bool use_write_multiple_{false};
optional<transform_func_t> publish_transform_func_{nullopt};
optional<write_transform_func_t> write_transform_func_{nullopt};
bool assumed_state_{false};
};
} // namespace modbus_controller

View File

@ -1,6 +1,6 @@
from esphome import pins
from esphome import automation, pins
import esphome.codegen as cg
from esphome.components import i2c
from esphome.components import i2c, key_provider
import esphome.config_validation as cv
from esphome.const import (
CONF_ID,
@ -8,13 +8,16 @@ from esphome.const import (
CONF_INVERTED,
CONF_MODE,
CONF_NUMBER,
CONF_ON_KEY,
CONF_OPEN_DRAIN,
CONF_OUTPUT,
CONF_PULLDOWN,
CONF_PULLUP,
CONF_TRIGGER_ID,
)
CONF_KEYPAD = "keypad"
CONF_KEYS = "keys"
CONF_KEY_ROWS = "key_rows"
CONF_KEY_COLUMNS = "key_columns"
CONF_SLEEP_TIME = "sleep_time"
@ -22,22 +25,47 @@ CONF_SCAN_TIME = "scan_time"
CONF_DEBOUNCE_TIME = "debounce_time"
CONF_SX1509_ID = "sx1509_id"
AUTO_LOAD = ["key_provider"]
DEPENDENCIES = ["i2c"]
MULTI_CONF = True
sx1509_ns = cg.esphome_ns.namespace("sx1509")
SX1509Component = sx1509_ns.class_("SX1509Component", cg.Component, i2c.I2CDevice)
SX1509Component = sx1509_ns.class_(
"SX1509Component", cg.Component, i2c.I2CDevice, key_provider.KeyProvider
)
SX1509GPIOPin = sx1509_ns.class_("SX1509GPIOPin", cg.GPIOPin)
SX1509KeyTrigger = sx1509_ns.class_(
"SX1509KeyTrigger", automation.Trigger.template(cg.uint8)
)
KEYPAD_SCHEMA = cv.Schema(
{
cv.Required(CONF_KEY_ROWS): cv.int_range(min=1, max=8),
cv.Required(CONF_KEY_COLUMNS): cv.int_range(min=1, max=8),
cv.Optional(CONF_SLEEP_TIME): cv.int_range(min=128, max=8192),
cv.Optional(CONF_SCAN_TIME): cv.int_range(min=1, max=128),
cv.Optional(CONF_DEBOUNCE_TIME): cv.int_range(min=1, max=64),
}
def check_keys(config):
if CONF_KEYS in config:
if len(config[CONF_KEYS]) != config[CONF_KEY_ROWS] * config[CONF_KEY_COLUMNS]:
raise cv.Invalid(
"The number of key codes must equal the number of rows * columns"
)
return config
KEYPAD_SCHEMA = cv.All(
cv.Schema(
{
cv.Required(CONF_KEY_ROWS): cv.int_range(min=2, max=8),
cv.Required(CONF_KEY_COLUMNS): cv.int_range(min=1, max=8),
cv.Optional(CONF_SLEEP_TIME): cv.int_range(min=128, max=8192),
cv.Optional(CONF_SCAN_TIME): cv.int_range(min=1, max=128),
cv.Optional(CONF_DEBOUNCE_TIME): cv.int_range(min=1, max=64),
cv.Optional(CONF_KEYS): cv.string,
cv.Optional(CONF_ON_KEY): automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(SX1509KeyTrigger),
}
),
}
),
check_keys,
)
CONFIG_SCHEMA = (
@ -56,17 +84,22 @@ async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
await i2c.register_i2c_device(var, config)
if CONF_KEYPAD in config:
keypad = config[CONF_KEYPAD]
cg.add(var.set_rows_cols(keypad[CONF_KEY_ROWS], keypad[CONF_KEY_COLUMNS]))
if conf := config.get(CONF_KEYPAD):
cg.add(var.set_rows_cols(conf[CONF_KEY_ROWS], conf[CONF_KEY_COLUMNS]))
if (
CONF_SLEEP_TIME in keypad
and CONF_SCAN_TIME in keypad
and CONF_DEBOUNCE_TIME in keypad
CONF_SLEEP_TIME in conf
and CONF_SCAN_TIME in conf
and CONF_DEBOUNCE_TIME in conf
):
cg.add(var.set_sleep_time(keypad[CONF_SLEEP_TIME]))
cg.add(var.set_scan_time(keypad[CONF_SCAN_TIME]))
cg.add(var.set_debounce_time(keypad[CONF_DEBOUNCE_TIME]))
cg.add(var.set_sleep_time(conf[CONF_SLEEP_TIME]))
cg.add(var.set_scan_time(conf[CONF_SCAN_TIME]))
cg.add(var.set_debounce_time(conf[CONF_DEBOUNCE_TIME]))
if keys := conf.get(CONF_KEYS):
cg.add(var.set_keys(keys))
for tconf in conf.get(CONF_ON_KEY, []):
trigger = cg.new_Pvariable(tconf[CONF_TRIGGER_ID])
cg.add(var.register_key_trigger(trigger))
await automation.build_automation(trigger, [(cg.uint8, "x")], tconf)
def validate_mode(value):

View File

@ -48,6 +48,30 @@ void SX1509Component::loop() {
uint16_t key_data = this->read_key_data();
for (auto *binary_sensor : this->keypad_binary_sensors_)
binary_sensor->process(key_data);
if (this->keys_.empty())
return;
if (key_data == 0) {
this->last_key_ = 0;
return;
}
int row, col;
for (row = 0; row < 7; row++) {
if (key_data & (1 << row))
break;
}
for (col = 8; col < 15; col++) {
if (key_data & (1 << col))
break;
}
col -= 8;
uint8_t key = this->keys_[row * this->cols_ + col];
if (key == this->last_key_)
return;
this->last_key_ = key;
ESP_LOGV(TAG, "row %d, col %d, key '%c'", row, col, key);
for (auto &trigger : this->key_triggers_)
trigger->trigger(key);
this->send_key_(key);
}
}
@ -230,9 +254,9 @@ void SX1509Component::setup_keypad_() {
scan_time_bits &= 0b111; // Scan time is bits 2:0
temp_byte = sleep_time_ | scan_time_bits;
this->write_byte(REG_KEY_CONFIG_1, temp_byte);
rows_ = (rows_ - 1) & 0b111; // 0 = off, 0b001 = 2 rows, 0b111 = 8 rows, etc.
cols_ = (cols_ - 1) & 0b111; // 0b000 = 1 column, ob111 = 8 columns, etc.
this->write_byte(REG_KEY_CONFIG_2, (rows_ << 3) | cols_);
temp_byte = ((this->rows_ - 1) & 0b111) << 3; // 0 = off, 0b001 = 2 rows, 0b111 = 8 rows, etc.
temp_byte |= (this->cols_ - 1) & 0b111; // 0b000 = 1 column, ob111 = 8 columns, etc.
this->write_byte(REG_KEY_CONFIG_2, temp_byte);
}
uint16_t SX1509Component::read_key_data() {

View File

@ -1,6 +1,7 @@
#pragma once
#include "esphome/components/i2c/i2c.h"
#include "esphome/components/key_provider/key_provider.h"
#include "esphome/core/component.h"
#include "esphome/core/hal.h"
#include "sx1509_gpio_pin.h"
@ -27,7 +28,9 @@ class SX1509Processor {
virtual void process(uint16_t data){};
};
class SX1509Component : public Component, public i2c::I2CDevice {
class SX1509KeyTrigger : public Trigger<uint8_t> {};
class SX1509Component : public Component, public i2c::I2CDevice, public key_provider::KeyProvider {
public:
SX1509Component() = default;
@ -47,12 +50,14 @@ class SX1509Component : public Component, public i2c::I2CDevice {
this->cols_ = cols;
this->has_keypad_ = true;
};
void set_keys(std::string keys) { this->keys_ = std::move(keys); };
void set_sleep_time(uint16_t sleep_time) { this->sleep_time_ = sleep_time; };
void set_scan_time(uint8_t scan_time) { this->scan_time_ = scan_time; };
void set_debounce_time(uint8_t debounce_time = 1) { this->debounce_time_ = debounce_time; };
void register_keypad_binary_sensor(SX1509Processor *binary_sensor) {
this->keypad_binary_sensors_.push_back(binary_sensor);
}
void register_key_trigger(SX1509KeyTrigger *trig) { this->key_triggers_.push_back(trig); };
void setup_led_driver(uint8_t pin);
protected:
@ -65,10 +70,13 @@ class SX1509Component : public Component, public i2c::I2CDevice {
bool has_keypad_ = false;
uint8_t rows_ = 0;
uint8_t cols_ = 0;
std::string keys_;
uint16_t sleep_time_ = 128;
uint8_t scan_time_ = 1;
uint8_t debounce_time_ = 1;
uint8_t last_key_ = 0;
std::vector<SX1509Processor *> keypad_binary_sensors_;
std::vector<SX1509KeyTrigger *> key_triggers_;
uint32_t last_loop_timestamp_ = 0;
const uint32_t min_loop_period_ = 15; // ms

View File

@ -483,14 +483,16 @@ template<typename... Ts> class WiFiConfigureAction : public Action<Ts...>, publi
// Enable WiFi
global_wifi_component->enable();
// Set timeout for the connection
this->set_timeout("wifi-connect-timeout", this->connection_timeout_.value(x...), [this]() {
this->connecting_ = false;
this->set_timeout("wifi-connect-timeout", this->connection_timeout_.value(x...), [this, x...]() {
// If the timeout is reached, stop connecting and revert to the old AP
global_wifi_component->disable();
global_wifi_component->save_wifi_sta(old_sta_.get_ssid(), old_sta_.get_password());
global_wifi_component->enable();
// Callback to notify the user that the connection failed
this->error_trigger_->trigger();
// Start a timeout for the fallback if the connection to the old AP fails
this->set_timeout("wifi-fallback-timeout", this->connection_timeout_.value(x...), [this]() {
this->connecting_ = false;
this->error_trigger_->trigger();
});
});
}
@ -503,6 +505,7 @@ template<typename... Ts> class WiFiConfigureAction : public Action<Ts...>, publi
if (global_wifi_component->is_connected()) {
// The WiFi is connected, stop the timeout and reset the connecting flag
this->cancel_timeout("wifi-connect-timeout");
this->cancel_timeout("wifi-fallback-timeout");
this->connecting_ = false;
if (global_wifi_component->wifi_ssid() == this->new_sta_.get_ssid()) {
// Callback to notify the user that the connection was successful

View File

@ -1,11 +1,10 @@
import esphome.codegen as cg
from esphome.components import text_sensor, uart
import esphome.config_validation as cv
from esphome.const import ICON_FINGERPRINT
from esphome.const import CONF_RESET, ICON_FINGERPRINT
CODEOWNERS = ["@hobbypunk90"]
DEPENDENCIES = ["uart"]
CONF_RESET = "reset"
wl134_ns = cg.esphome_ns.namespace("wl_134")
Wl134Component = wl134_ns.class_(

View File

@ -735,6 +735,7 @@ CONF_REFRESH = "refresh"
CONF_RELABEL = "relabel"
CONF_REPEAT = "repeat"
CONF_REPOSITORY = "repository"
CONF_RESET = "reset"
CONF_RESET_DURATION = "reset_duration"
CONF_RESET_PIN = "reset_pin"
CONF_RESIZE = "resize"

View File

@ -160,7 +160,8 @@
#if defined(USE_ESP32_VARIANT_ESP32S2)
#define USE_LOGGER_USB_CDC
#elif defined(USE_ESP32_VARIANT_ESP32S3) || defined(USE_ESP32_VARIANT_ESP32C3) || \
defined(USE_ESP32_VARIANT_ESP32C6) || defined(USE_ESP32_VARIANT_ESP32H2) || defined(USE_ESP32_VARIANT_ESP32P4)
defined(USE_ESP32_VARIANT_ESP32C5) || defined(USE_ESP32_VARIANT_ESP32C6) || defined(USE_ESP32_VARIANT_ESP32H2) || \
defined(USE_ESP32_VARIANT_ESP32P4)
#define USE_LOGGER_USB_CDC
#define USE_LOGGER_USB_SERIAL_JTAG
#endif

View File

@ -416,7 +416,9 @@ class LineComment(Statement):
self.value = value
def __str__(self):
parts = re.sub(r"\\\s*\n", r"<cont>\n", self.value, re.MULTILINE).split("\n")
parts = re.sub(r"\\\s*\n", r"<cont>\n", self.value, flags=re.MULTILINE).split(
"\n"
)
parts = [f"// {x}" for x in parts]
return "\n".join(parts)

View File

@ -601,10 +601,12 @@ class DownloadListRequestHandler(BaseHandler):
loop = asyncio.get_running_loop()
try:
downloads_json = await loop.run_in_executor(None, self._get, configuration)
except vol.Invalid:
except vol.Invalid as exc:
_LOGGER.exception("Error while fetching downloads", exc_info=exc)
self.send_error(404)
return
if downloads_json is None:
_LOGGER.error("Configuration %s not found", configuration)
self.send_error(404)
return
self.set_status(200)
@ -618,14 +620,17 @@ class DownloadListRequestHandler(BaseHandler):
if storage_json is None:
return None
config = yaml_util.load_yaml(settings.rel_path(configuration))
try:
config = yaml_util.load_yaml(settings.rel_path(configuration))
if const.CONF_EXTERNAL_COMPONENTS in config:
from esphome.components.external_components import (
do_external_components_pass,
)
if const.CONF_EXTERNAL_COMPONENTS in config:
from esphome.components.external_components import (
do_external_components_pass,
)
do_external_components_pass(config)
do_external_components_pass(config)
except vol.Invalid:
_LOGGER.info("Could not parse `external_components`, skipping")
from esphome.components.esp32 import VARIANTS as ESP32_VARIANTS

View File

@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools==80.8.0", "wheel>=0.43,<0.46"]
requires = ["setuptools==80.9.0", "wheel>=0.43,<0.46"]
build-backend = "setuptools.build_meta"
[project]

View File

@ -16,7 +16,7 @@ esphome-dashboard==20250514.0
aioesphomeapi==31.1.0
zeroconf==0.147.0
puremagic==1.29
ruamel.yaml==0.18.10 # dashboard_import
ruamel.yaml==0.18.11 # dashboard_import
esphome-glyphsets==0.2.0
pillow==10.4.0
cairosvg==2.8.2

View File

@ -1,13 +1,14 @@
pylint==3.3.7
flake8==7.2.0 # also change in .pre-commit-config.yaml when updating
ruff==0.11.11 # also change in .pre-commit-config.yaml when updating
pyupgrade==3.19.1 # also change in .pre-commit-config.yaml when updating
pyupgrade==3.20.0 # also change in .pre-commit-config.yaml when updating
pre-commit
# Unit tests
pytest==8.3.5
pytest-cov==6.1.1
pytest-mock==3.14.0
pytest-mock==3.14.1
pytest-asyncio==0.26.0
pytest-xdist==3.7.0
asyncmock==0.4.2
hypothesis==6.92.1

10
script/integration_test Executable file
View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -e
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "${script_dir}/.."
set -x
pytest -vvs --no-cov --tb=native -n 0 tests/integration/

View File

@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO6
sda_pin: GPIO7
<<: !include common.yaml

View File

@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@ -6,6 +6,12 @@ i2c:
sx1509:
- id: sx1509_hub
address: 0x3E
keypad:
key_rows: 2
key_columns: 2
keys: abcd
on_key:
- lambda: ESP_LOGD("test", "got key '%c'", x);
binary_sensor:
- platform: gpio
@ -13,6 +19,11 @@ binary_sensor:
pin:
sx1509: sx1509_hub
number: 3
- platform: sx1509
sx1509_id: sx1509_hub
name: "keypadkey_0"
row: 0
col: 0
switch:
- platform: gpio

View File

@ -53,6 +53,8 @@ async def dashboard() -> DashboardTestHelper:
assert DASHBOARD.settings.on_ha_addon is True
assert DASHBOARD.settings.using_auth is False
task = asyncio.create_task(DASHBOARD.async_run())
# Wait for initial device loading to complete
await DASHBOARD.entries.async_request_update_entries()
client = AsyncHTTPClient()
io_loop = IOLoop(make_current=False)
yield DashboardTestHelper(io_loop, client, port)

View File

@ -0,0 +1,80 @@
# ESPHome Integration Tests
This directory contains end-to-end integration tests for ESPHome, focusing on testing the complete flow from YAML configuration to running devices with API connections.
## Structure
- `conftest.py` - Common fixtures and utilities
- `const.py` - Constants used throughout the integration tests
- `types.py` - Type definitions for fixtures and functions
- `fixtures/` - YAML configuration files for tests
- `test_*.py` - Individual test files
## How it works
### Automatic YAML Loading
The `yaml_config` fixture automatically loads YAML configurations based on the test name:
- It looks for a file named after the test function (e.g., `test_host_mode_basic``fixtures/host_mode_basic.yaml`)
- The fixture file must exist or the test will fail with a clear error message
- The fixture automatically injects a dynamic port number into the API configuration
### Key Fixtures
- `run_compiled` - Combines write, compile, and run operations into a single context manager
- `api_client_connected` - Creates an API client that automatically connects using ReconnectLogic
- `reserved_tcp_port` - Reserves a TCP port by holding the socket open until ESPHome needs it
- `unused_tcp_port` - Provides the reserved port number for each test
### Writing Tests
The simplest way to write a test is to use the `run_compiled` and `api_client_connected` fixtures:
```python
@pytest.mark.asyncio
async def test_my_feature(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Test your feature using the connected client
device_info = await client.device_info()
assert device_info is not None
```
### Creating YAML Fixtures
Create a YAML file in the `fixtures/` directory with the same name as your test function (without the `test_` prefix):
```yaml
# fixtures/my_feature.yaml
esphome:
name: my-test-device
host:
api: # Port will be automatically injected
logger:
# Add your components here
```
## Running Tests
```bash
# Run all integration tests
script/integration_test
# Run a specific test
pytest -vv tests/integration/test_host_mode_basic.py
# Debug compilation errors or see ESPHome output
pytest -s tests/integration/test_host_mode_basic.py
```
## Implementation Details
- Tests automatically wait for the API port to be available before connecting
- Process cleanup is handled automatically, with graceful shutdown using SIGINT
- Each test gets its own temporary directory and unique port
- Port allocation minimizes race conditions by holding the socket until just before ESPHome starts
- Output from ESPHome processes is displayed for debugging

View File

@ -0,0 +1,3 @@
"""ESPHome integration tests."""
from __future__ import annotations

View File

@ -0,0 +1,426 @@
"""Common fixtures for integration tests."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
import logging
from pathlib import Path
import platform
import signal
import socket
import tempfile
from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic
import pytest
import pytest_asyncio
# Skip all integration tests on Windows
if platform.system() == "Windows":
pytest.skip(
"Integration tests are not supported on Windows", allow_module_level=True
)
from .const import (
API_CONNECTION_TIMEOUT,
DEFAULT_API_PORT,
LOCALHOST,
PORT_POLL_INTERVAL,
PORT_WAIT_TIMEOUT,
SIGINT_TIMEOUT,
SIGTERM_TIMEOUT,
)
from .types import (
APIClientConnectedFactory,
APIClientFactory,
CompileFunction,
ConfigWriter,
RunCompiledFunction,
RunFunction,
)
@pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging():
"""Enable debug logging for aioesphomeapi to help diagnose connection issues."""
# Get the aioesphomeapi logger
logger = logging.getLogger("aioesphomeapi")
# Save the original level
original_level = logger.level
# Set to DEBUG level
logger.setLevel(logging.DEBUG)
# Also ensure we have a handler that outputs to console
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
yield
# Restore original level
logger.setLevel(original_level)
@pytest.fixture
def integration_test_dir() -> Generator[Path]:
"""Create a temporary directory for integration tests."""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def reserved_tcp_port() -> Generator[tuple[int, socket.socket]]:
"""Reserve an unused TCP port by holding the socket open."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", 0))
port = s.getsockname()[1]
try:
yield port, s
finally:
s.close()
@pytest.fixture
def unused_tcp_port(reserved_tcp_port: tuple[int, socket.socket]) -> int:
"""Get the reserved TCP port number."""
return reserved_tcp_port[0]
@pytest_asyncio.fixture
async def yaml_config(request: pytest.FixtureRequest, unused_tcp_port: int) -> str:
"""Load YAML configuration based on test name."""
# Get the test function name
test_name: str = request.node.name
# Extract the base test name (remove test_ prefix and any parametrization)
base_name = test_name.replace("test_", "").partition("[")[0]
# Load the fixture file
fixture_path = Path(__file__).parent / "fixtures" / f"{base_name}.yaml"
if not fixture_path.exists():
raise FileNotFoundError(f"Fixture file not found: {fixture_path}")
loop = asyncio.get_running_loop()
content = await loop.run_in_executor(None, fixture_path.read_text)
# Replace the port in the config if it contains api section
if "api:" in content:
# Add port configuration after api:
content = content.replace("api:", f"api:\n port: {unused_tcp_port}")
return content
@pytest_asyncio.fixture
async def write_yaml_config(
integration_test_dir: Path, request: pytest.FixtureRequest
) -> AsyncGenerator[ConfigWriter]:
"""Write YAML configuration to a file."""
# Get the test name for default filename
test_name = request.node.name
base_name = test_name.replace("test_", "").split("[")[0]
async def _write_config(content: str, filename: str | None = None) -> Path:
if filename is None:
filename = f"{base_name}.yaml"
config_path = integration_test_dir / filename
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, config_path.write_text, content)
return config_path
yield _write_config
async def _run_esphome_command(
command: str,
config_path: Path,
cwd: Path,
) -> asyncio.subprocess.Process:
"""Run an ESPHome command with the given arguments."""
return await asyncio.create_subprocess_exec(
"esphome",
command,
str(config_path),
cwd=cwd,
stdout=None, # Inherit stdout
stderr=None, # Inherit stderr
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
)
@pytest_asyncio.fixture
async def compile_esphome(
integration_test_dir: Path,
) -> AsyncGenerator[CompileFunction]:
"""Compile an ESPHome configuration."""
async def _compile(config_path: Path) -> None:
proc = await _run_esphome_command("compile", config_path, integration_test_dir)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError(
f"Failed to compile {config_path}, return code: {proc.returncode}. "
f"Run with 'pytest -s' to see compilation output."
)
yield _compile
@pytest_asyncio.fixture
async def run_esphome_process(
integration_test_dir: Path,
) -> AsyncGenerator[RunFunction]:
"""Run an ESPHome process and manage its lifecycle."""
processes: list[asyncio.subprocess.Process] = []
async def _run(config_path: Path) -> asyncio.subprocess.Process:
process = await _run_esphome_command("run", config_path, integration_test_dir)
processes.append(process)
return process
yield _run
# Cleanup: terminate all "run" processes gracefully
for process in processes:
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown of the running ESPHome instance
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except asyncio.TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except asyncio.TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def create_api_client(
address: str = LOCALHOST,
port: int = DEFAULT_API_PORT,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AsyncGenerator[APIClient]:
"""Create an API client context manager."""
client = APIClient(
address=address,
port=port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
try:
yield client
finally:
await client.disconnect()
@pytest_asyncio.fixture
async def api_client_factory(
unused_tcp_port: int,
) -> AsyncGenerator[APIClientFactory]:
"""Factory for creating API client context managers."""
def _create_client(
address: str = LOCALHOST,
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AbstractAsyncContextManager[APIClient]:
return create_api_client(
address=address,
port=port if port is not None else unused_tcp_port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
yield _create_client
@asynccontextmanager
async def wait_and_connect_api_client(
address: str = LOCALHOST,
port: int = DEFAULT_API_PORT,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = API_CONNECTION_TIMEOUT,
) -> AsyncGenerator[APIClient]:
"""Wait for API to be available and connect."""
client = APIClient(
address=address,
port=port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
)
# Create a future to signal when connected
loop = asyncio.get_running_loop()
connected_future: asyncio.Future[None] = loop.create_future()
async def on_connect() -> None:
"""Called when successfully connected."""
if not connected_future.done():
connected_future.set_result(None)
async def on_disconnect(expected_disconnect: bool) -> None:
"""Called when disconnected."""
if not connected_future.done() and not expected_disconnect:
connected_future.set_exception(
APIConnectionError("Disconnected before fully connected")
)
async def on_connect_error(err: Exception) -> None:
"""Called when connection fails."""
if not connected_future.done():
connected_future.set_exception(err)
# Create and start the reconnect logic
reconnect_logic = ReconnectLogic(
client=client,
on_connect=on_connect,
on_disconnect=on_disconnect,
zeroconf_instance=None, # Not using zeroconf for integration tests
name=f"{address}:{port}",
on_connect_error=on_connect_error,
)
try:
# Start the connection
await reconnect_logic.start()
# Wait for connection with timeout
try:
await asyncio.wait_for(connected_future, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Failed to connect to API after {timeout} seconds")
yield client
finally:
# Stop reconnect logic and disconnect
await reconnect_logic.stop()
await client.disconnect()
@pytest_asyncio.fixture
async def api_client_connected(
unused_tcp_port: int,
) -> AsyncGenerator[APIClientConnectedFactory]:
"""Factory for creating connected API client context managers."""
def _connect_client(
address: str = LOCALHOST,
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = API_CONNECTION_TIMEOUT,
) -> AbstractAsyncContextManager[APIClient]:
return wait_and_connect_api_client(
address=address,
port=port if port is not None else unused_tcp_port,
password=password,
noise_psk=noise_psk,
client_info=client_info,
timeout=timeout,
)
yield _connect_client
async def wait_for_port_open(
host: str, port: int, timeout: float = PORT_WAIT_TIMEOUT
) -> None:
"""Wait for a TCP port to be open and accepting connections."""
loop = asyncio.get_running_loop()
start_time = loop.time()
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
_, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
return # Port is open
except (ConnectionRefusedError, OSError):
# Port not open yet, wait a bit and try again
await asyncio.sleep(PORT_POLL_INTERVAL)
raise TimeoutError(f"Port {port} on {host} did not open within {timeout} seconds")
@asynccontextmanager
async def run_compiled_context(
yaml_content: str,
filename: str | None,
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
run_esphome_process: RunFunction,
port: int,
port_socket: socket.socket | None = None,
) -> AsyncGenerator[asyncio.subprocess.Process]:
"""Context manager to write, compile and run an ESPHome configuration."""
# Write the YAML config
config_path = await write_yaml_config(yaml_content, filename)
# Compile the configuration
await compile_esphome(config_path)
# Close the port socket right before running to release the port
if port_socket is not None:
port_socket.close()
# Run the ESPHome device
process = await run_esphome_process(config_path)
assert process.returncode is None, "Process died immediately"
# Wait for the API server to start listening
await wait_for_port_open(LOCALHOST, port, timeout=PORT_WAIT_TIMEOUT)
try:
yield process
finally:
# Process cleanup is handled by run_esphome_process fixture
pass
@pytest_asyncio.fixture
async def run_compiled(
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
run_esphome_process: RunFunction,
reserved_tcp_port: tuple[int, socket.socket],
) -> AsyncGenerator[RunCompiledFunction]:
"""Write, compile and run an ESPHome configuration."""
port, port_socket = reserved_tcp_port
def _run_compiled(
yaml_content: str, filename: str | None = None
) -> AbstractAsyncContextManager[asyncio.subprocess.Process]:
return run_compiled_context(
yaml_content,
filename,
write_yaml_config,
compile_esphome,
run_esphome_process,
port,
port_socket,
)
yield _run_compiled

View File

@ -0,0 +1,14 @@
"""Constants for integration tests."""
# Network constants
DEFAULT_API_PORT = 6053
LOCALHOST = "127.0.0.1"
# Timeout constants
API_CONNECTION_TIMEOUT = 30.0 # seconds
PORT_WAIT_TIMEOUT = 30.0 # seconds
PORT_POLL_INTERVAL = 0.1 # seconds
# Process shutdown timeouts
SIGINT_TIMEOUT = 5.0 # seconds
SIGTERM_TIMEOUT = 2.0 # seconds

View File

@ -0,0 +1,5 @@
esphome:
name: host-test
host:
api:
logger:

View File

@ -0,0 +1,7 @@
esphome:
name: host-noise-test
host:
api:
encryption:
key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=
logger:

View File

@ -0,0 +1,7 @@
esphome:
name: host-noise-test
host:
api:
encryption:
key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=
logger:

View File

@ -0,0 +1,5 @@
esphome:
name: host-reconnect-test
host:
api:
logger:

View File

@ -0,0 +1,12 @@
esphome:
name: host-sensor-test
host:
api:
logger:
sensor:
- platform: template
name: Test Sensor
id: test_sensor
unit_of_measurement: °C
lambda: return 42.0;
update_interval: 0.1s

View File

@ -0,0 +1,22 @@
"""Basic integration test for Host mode."""
from __future__ import annotations
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_basic(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test basic Host mode functionality with API connection."""
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Verify we can get device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-test"

View File

@ -0,0 +1,53 @@
"""Integration test for Host mode with noise encryption."""
from __future__ import annotations
from aioesphomeapi import InvalidEncryptionKeyAPIError
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
# The API key for noise encryption
NOISE_KEY = "N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU="
@pytest.mark.asyncio
async def test_host_mode_noise_encryption(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test Host mode with noise encryption enabled."""
# Write, compile and run the ESPHome device, then connect to API
# The API client should handle noise encryption automatically
async with (
run_compiled(yaml_config),
api_client_connected(noise_psk=NOISE_KEY) as client,
):
# If we can get device info, the encryption is working
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "host-noise-test"
# List entities to ensure the encrypted connection is fully functional
entities = await client.list_entities_services()
assert entities is not None
@pytest.mark.asyncio
async def test_host_mode_noise_encryption_wrong_key(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that connection fails with wrong encryption key."""
# Write, compile and run the ESPHome device
async with run_compiled(yaml_config):
# Try to connect with wrong key - should fail with InvalidEncryptionKeyAPIError
with pytest.raises(InvalidEncryptionKeyAPIError):
async with api_client_connected(
noise_psk="wrong_key_that_should_not_work",
timeout=5, # Shorter timeout for expected failure
) as client:
# This should not be reached
await client.device_info()

View File

@ -0,0 +1,28 @@
"""Integration test for Host mode reconnection."""
from __future__ import annotations
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_reconnect(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test reconnecting to a Host mode device."""
# Write, compile and run the ESPHome device
async with run_compiled(yaml_config):
# First connection
async with api_client_connected() as client:
device_info = await client.device_info()
assert device_info is not None
# Reconnect with a new client
async with api_client_connected() as client2:
device_info2 = await client2.device_info()
assert device_info2 is not None
assert device_info2.name == device_info.name

View File

@ -0,0 +1,49 @@
"""Integration test for Host mode with sensor."""
from __future__ import annotations
import asyncio
from aioesphomeapi import EntityState
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_with_sensor(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test Host mode with a sensor component."""
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to state changes
states: dict[int, EntityState] = {}
sensor_future: asyncio.Future[EntityState] = asyncio.Future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# If this is our sensor with value 42.0, resolve the future
if (
hasattr(state, "state")
and state.state == 42.0
and not sensor_future.done()
):
sensor_future.set_result(state)
client.subscribe_states(on_state)
# Wait for sensor with specific value (42.0) with timeout
try:
test_sensor_state = await asyncio.wait_for(sensor_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(
f"Sensor with value 42.0 not received within 5 seconds. "
f"Received states: {list(states.values())}"
)
# Verify the sensor state
assert test_sensor_state.state == 42.0
assert len(states) > 0, "No states received"

View File

@ -0,0 +1,46 @@
"""Type definitions for integration tests."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from contextlib import AbstractAsyncContextManager
from pathlib import Path
from typing import Protocol
from aioesphomeapi import APIClient
ConfigWriter = Callable[[str, str | None], Awaitable[Path]]
CompileFunction = Callable[[Path], Awaitable[None]]
RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]]
RunCompiledFunction = Callable[
[str, str | None], AbstractAsyncContextManager[asyncio.subprocess.Process]
]
WaitFunction = Callable[[APIClient, float], Awaitable[bool]]
class APIClientFactory(Protocol):
"""Protocol for API client factory."""
def __call__( # noqa: E704
self,
address: str = "localhost",
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
) -> AbstractAsyncContextManager[APIClient]: ...
class APIClientConnectedFactory(Protocol):
"""Protocol for connected API client factory."""
def __call__( # noqa: E704
self,
address: str = "localhost",
port: int | None = None,
password: str = "",
noise_psk: str | None = None,
client_info: str = "integration-test",
timeout: float = 30,
) -> AbstractAsyncContextManager[APIClient]: ...