Compare commits

..

23 Commits

Author SHA1 Message Date
J. Nick Koston
a63927a5f6 Merge branch 'gpio_expander_fix_more_than_8_pins_in_bank' into optimize_pcf8574 2025-09-04 13:51:05 -05:00
J. Nick Koston
e843f1759b update docs 2025-09-04 13:49:19 -05:00
J. Nick Koston
ef50033766 [gpio_expander] Fix CachedGpioExpander template to support >8 pins per bank 2025-09-04 13:16:48 -05:00
J. Nick Koston
5b0d1fb30e cleanup 2025-09-04 12:57:44 -05:00
J. Nick Koston
ee090c7c38 cleanup 2025-09-04 12:32:52 -05:00
J. Nick Koston
e866ae0f50 handle 16 pins 2025-09-04 12:28:23 -05:00
J. Nick Koston
4885819881 use helper 2025-09-04 12:21:48 -05:00
J. Nick Koston
784d547294 correctness 2025-09-04 11:19:30 -05:00
J. Nick Koston
1d91bf5759 [pcf8574] Add lazy input caching to reduce I2C bus usage 2025-09-04 11:02:23 -05:00
Jesse Hills
c7ee727af4 Merge branch 'release' into dev 2025-09-04 22:10:21 +12:00
Jesse Hills
c5b2a9e24b Merge pull request #10558 from esphome/bump-2025.8.3
2025.8.3
2025-09-04 22:09:37 +12:00
J. Nick Koston
101d553df9 [esp8266] Reduce preference memory usage by 40% through field optimization (#10557) 2025-09-04 02:46:50 -05:00
J. Nick Koston
8fb6420b1c [esp8266] Store GPIO initialization arrays in PROGMEM to save RAM (#10560) 2025-09-04 02:44:12 -05:00
Maxim Raznatovski
c03d978b46 [wizard] extend the wizard dashboard API to allow upload and empty config options (#10203) 2025-09-04 14:02:49 +12:00
Jesse Hills
2d3cdf60ba Bump version to 2025.8.3 2025-09-04 09:06:00 +12:00
J. Nick Koston
a29fef166b [api] Fix VERY_VERBOSE logging compilation error with bool arrays (#10539) 2025-09-04 09:06:00 +12:00
Jonathan Swoboda
9fe94f1201 [esp32] Clear IDF environment variables (#10527)
Co-authored-by: J. Nick Koston <nick+github@koston.org>
2025-09-04 09:06:00 +12:00
Anton Viktorov
1b8978a89a [i2c] Fix bug write_register16 (#10547) 2025-09-04 09:06:00 +12:00
Jonathan Swoboda
6f188d1284 [esp32] Rebuild when idf_component.yml changes (#10540) 2025-09-04 09:06:00 +12:00
Clyde Stubbs
a1a336783e [mcp4461] Fix read transaction (#10465)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
2025-09-04 09:06:00 +12:00
Clyde Stubbs
c55bc93f70 [mipi_dsi] Fix config for Guition screen (#10464) 2025-09-04 09:06:00 +12:00
J. Nick Koston
de998f2f39 Fix incorrect entity count due to undefined execution order with globals (#10497) 2025-09-04 09:06:00 +12:00
Oliver Kleinecke
950299e52b Update mcp4461.cpp (#10479) 2025-09-04 09:06:00 +12:00
18 changed files with 393 additions and 123 deletions

View File

@@ -54,13 +54,13 @@ struct ISRPinArg {
ISRInternalGPIOPin ESP32InternalGPIOPin::to_isr() const {
auto *arg = new ISRPinArg{}; // NOLINT(cppcoreguidelines-owning-memory)
arg->pin = this->get_pin_num();
arg->pin = this->pin_;
arg->flags = gpio::FLAG_NONE;
arg->inverted = pin_flags_.inverted;
arg->inverted = inverted_;
#if defined(USE_ESP32_VARIANT_ESP32)
arg->use_rtc = rtc_gpio_is_valid_gpio(this->get_pin_num());
arg->use_rtc = rtc_gpio_is_valid_gpio(this->pin_);
if (arg->use_rtc)
arg->rtc_pin = rtc_io_number_get(this->get_pin_num());
arg->rtc_pin = rtc_io_number_get(this->pin_);
#endif
return ISRInternalGPIOPin((void *) arg);
}
@@ -69,23 +69,23 @@ void ESP32InternalGPIOPin::attach_interrupt(void (*func)(void *), void *arg, gpi
gpio_int_type_t idf_type = GPIO_INTR_ANYEDGE;
switch (type) {
case gpio::INTERRUPT_RISING_EDGE:
idf_type = pin_flags_.inverted ? GPIO_INTR_NEGEDGE : GPIO_INTR_POSEDGE;
idf_type = inverted_ ? GPIO_INTR_NEGEDGE : GPIO_INTR_POSEDGE;
break;
case gpio::INTERRUPT_FALLING_EDGE:
idf_type = pin_flags_.inverted ? GPIO_INTR_POSEDGE : GPIO_INTR_NEGEDGE;
idf_type = inverted_ ? GPIO_INTR_POSEDGE : GPIO_INTR_NEGEDGE;
break;
case gpio::INTERRUPT_ANY_EDGE:
idf_type = GPIO_INTR_ANYEDGE;
break;
case gpio::INTERRUPT_LOW_LEVEL:
idf_type = pin_flags_.inverted ? GPIO_INTR_HIGH_LEVEL : GPIO_INTR_LOW_LEVEL;
idf_type = inverted_ ? GPIO_INTR_HIGH_LEVEL : GPIO_INTR_LOW_LEVEL;
break;
case gpio::INTERRUPT_HIGH_LEVEL:
idf_type = pin_flags_.inverted ? GPIO_INTR_LOW_LEVEL : GPIO_INTR_HIGH_LEVEL;
idf_type = inverted_ ? GPIO_INTR_LOW_LEVEL : GPIO_INTR_HIGH_LEVEL;
break;
}
gpio_set_intr_type(get_pin_num(), idf_type);
gpio_intr_enable(get_pin_num());
gpio_set_intr_type(pin_, idf_type);
gpio_intr_enable(pin_);
if (!isr_service_installed) {
auto res = gpio_install_isr_service(ESP_INTR_FLAG_LEVEL3);
if (res != ESP_OK) {
@@ -94,7 +94,7 @@ void ESP32InternalGPIOPin::attach_interrupt(void (*func)(void *), void *arg, gpi
}
isr_service_installed = true;
}
gpio_isr_handler_add(get_pin_num(), func, arg);
gpio_isr_handler_add(pin_, func, arg);
}
std::string ESP32InternalGPIOPin::dump_summary() const {
@@ -112,13 +112,13 @@ void ESP32InternalGPIOPin::setup() {
conf.intr_type = GPIO_INTR_DISABLE;
gpio_config(&conf);
if (flags_ & gpio::FLAG_OUTPUT) {
gpio_set_drive_capability(get_pin_num(), get_drive_strength());
gpio_set_drive_capability(pin_, drive_strength_);
}
}
void ESP32InternalGPIOPin::pin_mode(gpio::Flags flags) {
// can't call gpio_config here because that logs in esp-idf which may cause issues
gpio_set_direction(get_pin_num(), flags_to_mode(flags));
gpio_set_direction(pin_, flags_to_mode(flags));
gpio_pull_mode_t pull_mode = GPIO_FLOATING;
if ((flags & gpio::FLAG_PULLUP) && (flags & gpio::FLAG_PULLDOWN)) {
pull_mode = GPIO_PULLUP_PULLDOWN;
@@ -127,14 +127,12 @@ void ESP32InternalGPIOPin::pin_mode(gpio::Flags flags) {
} else if (flags & gpio::FLAG_PULLDOWN) {
pull_mode = GPIO_PULLDOWN_ONLY;
}
gpio_set_pull_mode(get_pin_num(), pull_mode);
gpio_set_pull_mode(pin_, pull_mode);
}
bool ESP32InternalGPIOPin::digital_read() { return bool(gpio_get_level(get_pin_num())) != pin_flags_.inverted; }
void ESP32InternalGPIOPin::digital_write(bool value) {
gpio_set_level(get_pin_num(), value != pin_flags_.inverted ? 1 : 0);
}
void ESP32InternalGPIOPin::detach_interrupt() const { gpio_intr_disable(get_pin_num()); }
bool ESP32InternalGPIOPin::digital_read() { return bool(gpio_get_level(pin_)) != inverted_; }
void ESP32InternalGPIOPin::digital_write(bool value) { gpio_set_level(pin_, value != inverted_ ? 1 : 0); }
void ESP32InternalGPIOPin::detach_interrupt() const { gpio_intr_disable(pin_); }
} // namespace esp32

View File

@@ -7,17 +7,11 @@
namespace esphome {
namespace esp32 {
// Static assertions to ensure our bit-packed fields can hold the enum values
static_assert(GPIO_NUM_MAX <= 256, "gpio_num_t has too many values for uint8_t");
static_assert(GPIO_DRIVE_CAP_MAX <= 4, "gpio_drive_cap_t has too many values for 2-bit field");
class ESP32InternalGPIOPin : public InternalGPIOPin {
public:
void set_pin(gpio_num_t pin) { pin_ = static_cast<uint8_t>(pin); }
void set_inverted(bool inverted) { pin_flags_.inverted = inverted; }
void set_drive_strength(gpio_drive_cap_t drive_strength) {
pin_flags_.drive_strength = static_cast<uint8_t>(drive_strength);
}
void set_pin(gpio_num_t pin) { pin_ = pin; }
void set_inverted(bool inverted) { inverted_ = inverted; }
void set_drive_strength(gpio_drive_cap_t drive_strength) { drive_strength_ = drive_strength; }
void set_flags(gpio::Flags flags) { flags_ = flags; }
void setup() override;
@@ -27,26 +21,17 @@ class ESP32InternalGPIOPin : public InternalGPIOPin {
std::string dump_summary() const override;
void detach_interrupt() const override;
ISRInternalGPIOPin to_isr() const override;
uint8_t get_pin() const override { return pin_; }
uint8_t get_pin() const override { return (uint8_t) pin_; }
gpio::Flags get_flags() const override { return flags_; }
bool is_inverted() const override { return pin_flags_.inverted; }
gpio_num_t get_pin_num() const { return static_cast<gpio_num_t>(pin_); }
gpio_drive_cap_t get_drive_strength() const { return static_cast<gpio_drive_cap_t>(pin_flags_.drive_strength); }
bool is_inverted() const override { return inverted_; }
protected:
void attach_interrupt(void (*func)(void *), void *arg, gpio::InterruptType type) const override;
// Memory layout: 8 bytes total on 32-bit systems
// - 3 bytes for members below
// - 1 byte padding for alignment
// - 4 bytes for vtable pointer
uint8_t pin_; // GPIO pin number (0-255, actual max ~48 on ESP32)
gpio::Flags flags_; // GPIO flags (1 byte)
struct PinFlags {
uint8_t inverted : 1; // Invert pin logic (1 bit)
uint8_t drive_strength : 2; // Drive strength 0-3 (2 bits)
uint8_t reserved : 5; // Reserved for future use (5 bits)
} pin_flags_; // Total: 1 byte
gpio_num_t pin_;
gpio_drive_cap_t drive_strength_;
gpio::Flags flags_;
bool inverted_;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static bool isr_service_installed;
};

View File

@@ -58,8 +58,8 @@ extern "C" void resetPins() { // NOLINT
#ifdef USE_ESP8266_EARLY_PIN_INIT
for (int i = 0; i < 16; i++) {
uint8_t mode = ESPHOME_ESP8266_GPIO_INITIAL_MODE[i];
uint8_t level = ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[i];
uint8_t mode = progmem_read_byte(&ESPHOME_ESP8266_GPIO_INITIAL_MODE[i]);
uint8_t level = progmem_read_byte(&ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[i]);
if (mode != 255)
pinMode(i, mode); // NOLINT
if (level != 255)

View File

@@ -199,11 +199,11 @@ async def add_pin_initial_states_array():
cg.add_global(
cg.RawExpression(
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_MODE[16] = {{{initial_modes_s}}}"
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_MODE[16] PROGMEM = {{{initial_modes_s}}}"
)
)
cg.add_global(
cg.RawExpression(
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[16] = {{{initial_levels_s}}}"
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[16] PROGMEM = {{{initial_levels_s}}}"
)
)

View File

@@ -1,6 +1,7 @@
#ifdef USE_ESP8266
#include <c_types.h>
#include <cinttypes>
extern "C" {
#include "spi_flash.h"
}
@@ -119,16 +120,16 @@ static bool load_from_rtc(size_t offset, uint32_t *data, size_t len) {
class ESP8266PreferenceBackend : public ESPPreferenceBackend {
public:
size_t offset = 0;
uint32_t type = 0;
uint16_t offset = 0;
uint8_t length_words = 0; // Max 255 words (1020 bytes of data)
bool in_flash = false;
size_t length_words = 0;
bool save(const uint8_t *data, size_t len) override {
if (bytes_to_words(len) != length_words) {
return false;
}
size_t buffer_size = length_words + 1;
size_t buffer_size = static_cast<size_t>(length_words) + 1;
std::unique_ptr<uint32_t[]> buffer(new uint32_t[buffer_size]()); // Note the () for zero-initialization
memcpy(buffer.get(), data, len);
buffer[length_words] = calculate_crc(buffer.get(), buffer.get() + length_words, type);
@@ -142,7 +143,7 @@ class ESP8266PreferenceBackend : public ESPPreferenceBackend {
if (bytes_to_words(len) != length_words) {
return false;
}
size_t buffer_size = length_words + 1;
size_t buffer_size = static_cast<size_t>(length_words) + 1;
std::unique_ptr<uint32_t[]> buffer(new uint32_t[buffer_size]());
bool ret = in_flash ? load_from_flash(offset, buffer.get(), buffer_size)
: load_from_rtc(offset, buffer.get(), buffer_size);
@@ -176,15 +177,19 @@ class ESP8266Preferences : public ESPPreferences {
ESPPreferenceObject make_preference(size_t length, uint32_t type, bool in_flash) override {
uint32_t length_words = bytes_to_words(length);
if (length_words > 255) {
ESP_LOGE(TAG, "Preference too large: %" PRIu32 " words > 255", length_words);
return {};
}
if (in_flash) {
uint32_t start = current_flash_offset;
uint32_t end = start + length_words + 1;
if (end > ESP8266_FLASH_STORAGE_SIZE)
return {};
auto *pref = new ESP8266PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
pref->offset = start;
pref->offset = static_cast<uint16_t>(start);
pref->type = type;
pref->length_words = length_words;
pref->length_words = static_cast<uint8_t>(length_words);
pref->in_flash = true;
current_flash_offset = end;
return {pref};
@@ -210,9 +215,9 @@ class ESP8266Preferences : public ESPPreferences {
uint32_t rtc_offset = in_normal ? start + 32 : start - 96;
auto *pref = new ESP8266PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
pref->offset = rtc_offset;
pref->offset = static_cast<uint16_t>(rtc_offset);
pref->type = type;
pref->length_words = length_words;
pref->length_words = static_cast<uint8_t>(length_words);
pref->in_flash = false;
current_offset += length_words + 1;
return pref;

View File

@@ -11,17 +11,22 @@ namespace esphome::gpio_expander {
/// @brief A class to cache the read state of a GPIO expander.
/// This class caches reads between GPIO Pins which are on the same bank.
/// This means that for reading whole Port (ex. 8 pins) component needs only one
/// I2C/SPI read per main loop call. It assumes, that one bit in byte identifies one GPIO pin
/// I2C/SPI read per main loop call. It assumes that one bit in byte identifies one GPIO pin.
///
/// Template parameters:
/// T - Type which represents internal register. Could be uint8_t or uint16_t. Adjust to
/// match size of your internal GPIO bank register.
/// N - Number of pins
template<typename T, T N> class CachedGpioExpander {
/// T - Type which represents internal bank register. Could be uint8_t or uint16_t.
/// Choose based on how your I/O expander reads pins:
/// * uint8_t: For chips that read banks separately (8 pins at a time)
/// Examples: MCP23017 (2x8-bit banks), TCA9555 (2x8-bit banks)
/// * uint16_t: For chips that read all pins at once (up to 16 pins)
/// Examples: PCF8574/8575 (8/16 pins), PCA9554/9555 (8/16 pins)
/// N - Total number of pins (as uint8_t)
template<typename T, uint8_t N> class CachedGpioExpander {
public:
/// @brief Read the state of the given pin. This will invalidate the cache for the given pin number.
/// @param pin Pin number to read
/// @return Pin state
bool digital_read(T pin) {
bool digital_read(uint8_t pin) {
const uint8_t bank = pin / BANK_SIZE;
const T pin_mask = (1 << (pin % BANK_SIZE));
// Check if specific pin cache is valid
@@ -38,15 +43,25 @@ template<typename T, T N> class CachedGpioExpander {
return this->digital_read_cache(pin);
}
void digital_write(T pin, bool value) { this->digital_write_hw(pin, value); }
void digital_write(uint8_t pin, bool value) { this->digital_write_hw(pin, value); }
protected:
/// @brief Call component low level function to read GPIO state from device
virtual bool digital_read_hw(T pin) = 0;
/// @brief Call component read function from internal cache.
virtual bool digital_read_cache(T pin) = 0;
/// @brief Call component low level function to write GPIO state to device
virtual void digital_write_hw(T pin, bool value) = 0;
/// @brief Read GPIO bank from hardware into internal state
/// @param pin Pin number (used to determine which bank to read)
/// @return true if read succeeded, false on communication error
/// @note This does NOT return the pin state. It returns whether the read operation succeeded.
/// The actual pin state should be returned by digital_read_cache().
virtual bool digital_read_hw(uint8_t pin) = 0;
/// @brief Get cached pin value from internal state
/// @param pin Pin number to read
/// @return Pin state (true = HIGH, false = LOW)
virtual bool digital_read_cache(uint8_t pin) = 0;
/// @brief Write GPIO state to hardware
/// @param pin Pin number to write
/// @param value Pin state to write (true = HIGH, false = LOW)
virtual void digital_write_hw(uint8_t pin, bool value) = 0;
/// @brief Invalidate cache. This function should be called in component loop().
void reset_pin_cache_() { memset(this->read_cache_valid_, 0x00, CACHE_SIZE_BYTES); }

View File

@@ -11,6 +11,7 @@ from esphome.const import (
CONF_OUTPUT,
)
AUTO_LOAD = ["gpio_expander"]
DEPENDENCIES = ["i2c"]
MULTI_CONF = True

View File

@@ -16,6 +16,10 @@ void PCF8574Component::setup() {
this->write_gpio_();
this->read_gpio_();
}
void PCF8574Component::loop() {
// Invalidate the cache at the start of each loop
this->reset_pin_cache_();
}
void PCF8574Component::dump_config() {
ESP_LOGCONFIG(TAG, "PCF8574:");
LOG_I2C_DEVICE(this)
@@ -24,17 +28,19 @@ void PCF8574Component::dump_config() {
ESP_LOGE(TAG, ESP_LOG_MSG_COMM_FAIL);
}
}
bool PCF8574Component::digital_read(uint8_t pin) {
this->read_gpio_();
return this->input_mask_ & (1 << pin);
bool PCF8574Component::digital_read_hw(uint8_t pin) {
// Read all pins from hardware into input_mask_
return this->read_gpio_(); // Return true if I2C read succeeded, false on error
}
void PCF8574Component::digital_write(uint8_t pin, bool value) {
bool PCF8574Component::digital_read_cache(uint8_t pin) { return this->input_mask_ & (1 << pin); }
void PCF8574Component::digital_write_hw(uint8_t pin, bool value) {
if (value) {
this->output_mask_ |= (1 << pin);
} else {
this->output_mask_ &= ~(1 << pin);
}
this->write_gpio_();
}
void PCF8574Component::pin_mode(uint8_t pin, gpio::Flags flags) {
@@ -91,6 +97,9 @@ bool PCF8574Component::write_gpio_() {
}
float PCF8574Component::get_setup_priority() const { return setup_priority::IO; }
// Run our loop() method early to invalidate cache before any other components access the pins
float PCF8574Component::get_loop_priority() const { return 9.0f; } // Just after WIFI
void PCF8574GPIOPin::setup() { pin_mode(flags_); }
void PCF8574GPIOPin::pin_mode(gpio::Flags flags) { this->parent_->pin_mode(this->pin_, flags); }
bool PCF8574GPIOPin::digital_read() { return this->parent_->digital_read(this->pin_) != this->inverted_; }

View File

@@ -3,11 +3,16 @@
#include "esphome/core/component.h"
#include "esphome/core/hal.h"
#include "esphome/components/i2c/i2c.h"
#include "esphome/components/gpio_expander/cached_gpio.h"
namespace esphome {
namespace pcf8574 {
class PCF8574Component : public Component, public i2c::I2CDevice {
// PCF8574(8 pins)/PCF8575(16 pins) always read/write all pins in a single I2C transaction
// so we use uint16_t as bank type to ensure all pins are in one bank and cached together
class PCF8574Component : public Component,
public i2c::I2CDevice,
public gpio_expander::CachedGpioExpander<uint16_t, 16> {
public:
PCF8574Component() = default;
@@ -15,20 +20,22 @@ class PCF8574Component : public Component, public i2c::I2CDevice {
/// Check i2c availability and setup masks
void setup() override;
/// Helper function to read the value of a pin.
bool digital_read(uint8_t pin);
/// Helper function to write the value of a pin.
void digital_write(uint8_t pin, bool value);
/// Invalidate cache at start of each loop
void loop() override;
/// Helper function to set the pin mode of a pin.
void pin_mode(uint8_t pin, gpio::Flags flags);
float get_setup_priority() const override;
float get_loop_priority() const override;
void dump_config() override;
protected:
bool read_gpio_();
bool digital_read_hw(uint8_t pin) override;
bool digital_read_cache(uint8_t pin) override;
void digital_write_hw(uint8_t pin, bool value) override;
bool read_gpio_();
bool write_gpio_();
/// Mask for the pin mode - 1 means output, 0 means input

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import base64
import binascii
from collections.abc import Callable, Iterable
import datetime
import functools
@@ -490,7 +491,17 @@ class WizardRequestHandler(BaseHandler):
kwargs = {
k: v
for k, v in json.loads(self.request.body.decode()).items()
if k in ("name", "platform", "board", "ssid", "psk", "password")
if k
in (
"type",
"name",
"platform",
"board",
"ssid",
"psk",
"password",
"file_content",
)
}
if not kwargs["name"]:
self.set_status(422)
@@ -498,19 +509,65 @@ class WizardRequestHandler(BaseHandler):
self.write(json.dumps({"error": "Name is required"}))
return
if "type" not in kwargs:
# Default to basic wizard type for backwards compatibility
kwargs["type"] = "basic"
kwargs["friendly_name"] = kwargs["name"]
kwargs["name"] = friendly_name_slugify(kwargs["friendly_name"])
kwargs["ota_password"] = secrets.token_hex(16)
noise_psk = secrets.token_bytes(32)
kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode()
if kwargs["type"] == "basic":
kwargs["ota_password"] = secrets.token_hex(16)
noise_psk = secrets.token_bytes(32)
kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode()
elif kwargs["type"] == "upload":
try:
kwargs["file_text"] = base64.b64decode(kwargs["file_content"]).decode(
"utf-8"
)
except (binascii.Error, UnicodeDecodeError):
self.set_status(422)
self.set_header("content-type", "application/json")
self.write(
json.dumps({"error": "The uploaded file is not correctly encoded."})
)
return
elif kwargs["type"] != "empty":
self.set_status(422)
self.set_header("content-type", "application/json")
self.write(
json.dumps(
{"error": f"Invalid wizard type specified: {kwargs['type']}"}
)
)
return
filename = f"{kwargs['name']}.yaml"
destination = settings.rel_path(filename)
wizard.wizard_write(path=destination, **kwargs)
self.set_status(200)
self.set_header("content-type", "application/json")
self.write(json.dumps({"configuration": filename}))
self.finish()
# Check if destination file already exists
if os.path.exists(destination):
self.set_status(409) # Conflict status code
self.set_header("content-type", "application/json")
self.write(
json.dumps({"error": f"Configuration file '{filename}' already exists"})
)
self.finish()
return
success = wizard.wizard_write(path=destination, **kwargs)
if success:
self.set_status(200)
self.set_header("content-type", "application/json")
self.write(json.dumps({"configuration": filename}))
self.finish()
else:
self.set_status(500)
self.set_header("content-type", "application/json")
self.write(
json.dumps(
{"error": "Failed to write configuration, see logs for details"}
)
)
self.finish()
class ImportRequestHandler(BaseHandler):

View File

@@ -189,32 +189,45 @@ def wizard_write(path, **kwargs):
from esphome.components.rtl87xx import boards as rtl87xx_boards
name = kwargs["name"]
board = kwargs["board"]
if kwargs["type"] == "empty":
file_text = ""
# Will be updated later after editing the file
hardware = "UNKNOWN"
elif kwargs["type"] == "upload":
file_text = kwargs["file_text"]
hardware = "UNKNOWN"
else: # "basic"
board = kwargs["board"]
for key in ("ssid", "psk", "password", "ota_password"):
if key in kwargs:
kwargs[key] = sanitize_double_quotes(kwargs[key])
for key in ("ssid", "psk", "password", "ota_password"):
if key in kwargs:
kwargs[key] = sanitize_double_quotes(kwargs[key])
if "platform" not in kwargs:
if board in esp8266_boards.BOARDS:
platform = "ESP8266"
elif board in esp32_boards.BOARDS:
platform = "ESP32"
elif board in rp2040_boards.BOARDS:
platform = "RP2040"
elif board in bk72xx_boards.BOARDS:
platform = "BK72XX"
elif board in ln882x_boards.BOARDS:
platform = "LN882X"
elif board in rtl87xx_boards.BOARDS:
platform = "RTL87XX"
else:
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
return False
kwargs["platform"] = platform
hardware = kwargs["platform"]
file_text = wizard_file(**kwargs)
if "platform" not in kwargs:
if board in esp8266_boards.BOARDS:
platform = "ESP8266"
elif board in esp32_boards.BOARDS:
platform = "ESP32"
elif board in rp2040_boards.BOARDS:
platform = "RP2040"
elif board in bk72xx_boards.BOARDS:
platform = "BK72XX"
elif board in ln882x_boards.BOARDS:
platform = "LN882X"
elif board in rtl87xx_boards.BOARDS:
platform = "RTL87XX"
else:
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
return False
kwargs["platform"] = platform
hardware = kwargs["platform"]
# Check if file already exists to prevent overwriting
if os.path.exists(path) and os.path.isfile(path):
safe_print(color(AnsiFore.RED, f'The file "{path}" already exists.'))
return False
write_file(path, wizard_file(**kwargs))
write_file(path, file_text)
storage = StorageJSON.from_wizard(name, name, f"{name}.local", hardware)
storage_path = ext_storage_path(os.path.basename(path))
storage.save(storage_path)

View File

@@ -27,11 +27,13 @@ void GPIOExpanderTestComponent::setup() {
bool GPIOExpanderTestComponent::digital_read_hw(uint8_t pin) {
ESP_LOGD(TAG, "digital_read_hw pin=%d", pin);
// Return true to indicate successful read operation
return true;
}
bool GPIOExpanderTestComponent::digital_read_cache(uint8_t pin) {
ESP_LOGD(TAG, "digital_read_cache pin=%d", pin);
// Return the pin state (always HIGH for testing)
return true;
}

View File

@@ -0,0 +1,24 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
AUTO_LOAD = ["gpio_expander"]
gpio_expander_test_component_uint16_ns = cg.esphome_ns.namespace(
"gpio_expander_test_component_uint16"
)
GPIOExpanderTestUint16Component = gpio_expander_test_component_uint16_ns.class_(
"GPIOExpanderTestUint16Component", cg.Component
)
CONFIG_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(GPIOExpanderTestUint16Component),
}
).extend(cv.COMPONENT_SCHEMA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)

View File

@@ -0,0 +1,43 @@
#include "gpio_expander_test_component_uint16.h"
#include "esphome/core/log.h"
namespace esphome::gpio_expander_test_component_uint16 {
static const char *const TAG = "gpio_expander_test_uint16";
void GPIOExpanderTestUint16Component::setup() {
ESP_LOGD(TAG, "Testing uint16_t bank (single 16-pin bank)");
// Test reading all 16 pins - first should trigger hw read, rest use cache
for (uint8_t pin = 0; pin < 16; pin++) {
this->digital_read(pin);
}
// Reset cache and test specific reads
ESP_LOGD(TAG, "Resetting cache for uint16_t test");
this->reset_pin_cache_();
// First read triggers hw for entire bank
this->digital_read(5);
// These should all use cache since they're in the same bank
this->digital_read(10);
this->digital_read(15);
this->digital_read(0);
ESP_LOGD(TAG, "DONE_UINT16");
}
bool GPIOExpanderTestUint16Component::digital_read_hw(uint8_t pin) {
ESP_LOGD(TAG, "uint16_digital_read_hw pin=%d", pin);
// In a real component, this would read from I2C/SPI into internal state
// For testing, we just return true to indicate successful read
return true; // Return true to indicate successful read
}
bool GPIOExpanderTestUint16Component::digital_read_cache(uint8_t pin) {
ESP_LOGD(TAG, "uint16_digital_read_cache pin=%d", pin);
// Return the actual pin state from our test pattern
return (this->test_state_ >> pin) & 1;
}
} // namespace esphome::gpio_expander_test_component_uint16

View File

@@ -0,0 +1,23 @@
#pragma once
#include "esphome/components/gpio_expander/cached_gpio.h"
#include "esphome/core/component.h"
namespace esphome::gpio_expander_test_component_uint16 {
// Test component using uint16_t bank type (single 16-pin bank)
class GPIOExpanderTestUint16Component : public Component,
public esphome::gpio_expander::CachedGpioExpander<uint16_t, 16> {
public:
void setup() override;
protected:
bool digital_read_hw(uint8_t pin) override;
bool digital_read_cache(uint8_t pin) override;
void digital_write_hw(uint8_t pin, bool value) override{};
private:
uint16_t test_state_{0xAAAA}; // Test pattern: alternating bits
};
} // namespace esphome::gpio_expander_test_component_uint16

View File

@@ -12,6 +12,10 @@ external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
components: [gpio_expander_test_component]
components: [gpio_expander_test_component, gpio_expander_test_component_uint16]
# Test with uint8_t (multiple banks)
gpio_expander_test_component:
# Test with uint16_t (single bank)
gpio_expander_test_component_uint16:

View File

@@ -30,9 +30,15 @@ async def test_gpio_expander_cache(
logs_done = asyncio.Event()
# Patterns to match in logs
digital_read_hw_pattern = re.compile(r"digital_read_hw pin=(\d+)")
digital_read_cache_pattern = re.compile(r"digital_read_cache pin=(\d+)")
# Patterns to match in logs - match any variation of digital_read
read_hw_pattern = re.compile(r"(?:uint16_)?digital_read_hw pin=(\d+)")
read_cache_pattern = re.compile(r"(?:uint16_)?digital_read_cache pin=(\d+)")
# Keep specific patterns for building the expected order
digital_read_hw_pattern = re.compile(r"^digital_read_hw pin=(\d+)")
digital_read_cache_pattern = re.compile(r"^digital_read_cache pin=(\d+)")
uint16_read_hw_pattern = re.compile(r"^uint16_digital_read_hw pin=(\d+)")
uint16_read_cache_pattern = re.compile(r"^uint16_digital_read_cache pin=(\d+)")
# ensure logs are in the expected order
log_order = [
@@ -59,6 +65,17 @@ async def test_gpio_expander_cache(
(digital_read_cache_pattern, 14),
(digital_read_hw_pattern, 14),
(digital_read_cache_pattern, 14),
# uint16_t component tests (single bank of 16 pins)
(uint16_read_hw_pattern, 0), # First pin triggers hw read
[
(uint16_read_cache_pattern, i) for i in range(0, 16)
], # All 16 pins return via cache
# After cache reset
(uint16_read_hw_pattern, 5), # First read after reset triggers hw
(uint16_read_cache_pattern, 5),
(uint16_read_cache_pattern, 10), # These use cache (same bank)
(uint16_read_cache_pattern, 15),
(uint16_read_cache_pattern, 0),
]
# Flatten the log order for easier processing
log_order: list[tuple[re.Pattern, int]] = [
@@ -77,17 +94,22 @@ async def test_gpio_expander_cache(
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
if "digital_read" in clean_line:
# Extract just the log message part (after the log level)
msg = clean_line.split(": ", 1)[-1] if ": " in clean_line else clean_line
# Check if this line contains a read operation we're tracking
if read_hw_pattern.search(msg) or read_cache_pattern.search(msg):
if index >= len(log_order):
print(f"Received unexpected log line: {clean_line}")
print(f"Received unexpected log line: {msg}")
logs_done.set()
return
pattern, expected_pin = log_order[index]
match = pattern.search(clean_line)
match = pattern.search(msg)
if not match:
print(f"Log line did not match next expected pattern: {clean_line}")
print(f"Log line did not match next expected pattern: {msg}")
print(f"Expected pattern: {pattern.pattern}")
logs_done.set()
return
@@ -99,9 +121,10 @@ async def test_gpio_expander_cache(
index += 1
elif "DONE" in clean_line:
# Check if we reached the end of the expected log entries
logs_done.set()
elif "DONE_UINT16" in clean_line:
# uint16 component is done, check if we've seen all expected logs
if index == len(log_order):
logs_done.set()
# Run with log monitoring
async with (

View File

@@ -17,6 +17,7 @@ import esphome.wizard as wz
@pytest.fixture
def default_config():
return {
"type": "basic",
"name": "test-name",
"platform": "ESP8266",
"board": "esp01_1m",
@@ -125,6 +126,47 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
assert "esp8266:" in generated_config
def test_wizard_empty_config(tmp_path, monkeypatch):
"""
The wizard should be able to create an empty configuration
"""
# Given
empty_config = {
"type": "empty",
"name": "test-empty",
}
monkeypatch.setattr(wz, "write_file", MagicMock())
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
# When
wz.wizard_write(tmp_path, **empty_config)
# Then
generated_config = wz.write_file.call_args.args[1]
assert generated_config == ""
def test_wizard_upload_config(tmp_path, monkeypatch):
"""
The wizard should be able to import an base64 encoded configuration
"""
# Given
empty_config = {
"type": "upload",
"name": "test-upload",
"file_text": "# imported file 📁\n\n",
}
monkeypatch.setattr(wz, "write_file", MagicMock())
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
# When
wz.wizard_write(tmp_path, **empty_config)
# Then
generated_config = wz.write_file.call_args.args[1]
assert generated_config == "# imported file 📁\n\n"
def test_wizard_write_defaults_platform_from_board_esp8266(
default_config, tmp_path, monkeypatch
):
@@ -471,3 +513,22 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
# Then
assert retval == 0
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
"""
The wizard_write function should not overwrite existing config files and return False
"""
# Given
config_file = tmpdir.join("test.yaml")
original_content = "# Original config content\n"
config_file.write(original_content)
monkeypatch.setattr(CORE, "config_path", str(tmpdir))
# When
result = wz.wizard_write(str(config_file), **default_config)
# Then
assert result is False # Should return False when file exists
assert config_file.read() == original_content