mirror of
https://github.com/esphome/esphome.git
synced 2025-09-07 12:15:36 +00:00
Compare commits
23 Commits
esp32_gpio
...
optimize_p
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a63927a5f6 | ||
![]() |
e843f1759b | ||
![]() |
ef50033766 | ||
![]() |
5b0d1fb30e | ||
![]() |
ee090c7c38 | ||
![]() |
e866ae0f50 | ||
![]() |
4885819881 | ||
![]() |
784d547294 | ||
![]() |
1d91bf5759 | ||
![]() |
c7ee727af4 | ||
![]() |
c5b2a9e24b | ||
![]() |
101d553df9 | ||
![]() |
8fb6420b1c | ||
![]() |
c03d978b46 | ||
![]() |
2d3cdf60ba | ||
![]() |
a29fef166b | ||
![]() |
9fe94f1201 | ||
![]() |
1b8978a89a | ||
![]() |
6f188d1284 | ||
![]() |
a1a336783e | ||
![]() |
c55bc93f70 | ||
![]() |
de998f2f39 | ||
![]() |
950299e52b |
@@ -54,13 +54,13 @@ struct ISRPinArg {
|
||||
|
||||
ISRInternalGPIOPin ESP32InternalGPIOPin::to_isr() const {
|
||||
auto *arg = new ISRPinArg{}; // NOLINT(cppcoreguidelines-owning-memory)
|
||||
arg->pin = this->get_pin_num();
|
||||
arg->pin = this->pin_;
|
||||
arg->flags = gpio::FLAG_NONE;
|
||||
arg->inverted = pin_flags_.inverted;
|
||||
arg->inverted = inverted_;
|
||||
#if defined(USE_ESP32_VARIANT_ESP32)
|
||||
arg->use_rtc = rtc_gpio_is_valid_gpio(this->get_pin_num());
|
||||
arg->use_rtc = rtc_gpio_is_valid_gpio(this->pin_);
|
||||
if (arg->use_rtc)
|
||||
arg->rtc_pin = rtc_io_number_get(this->get_pin_num());
|
||||
arg->rtc_pin = rtc_io_number_get(this->pin_);
|
||||
#endif
|
||||
return ISRInternalGPIOPin((void *) arg);
|
||||
}
|
||||
@@ -69,23 +69,23 @@ void ESP32InternalGPIOPin::attach_interrupt(void (*func)(void *), void *arg, gpi
|
||||
gpio_int_type_t idf_type = GPIO_INTR_ANYEDGE;
|
||||
switch (type) {
|
||||
case gpio::INTERRUPT_RISING_EDGE:
|
||||
idf_type = pin_flags_.inverted ? GPIO_INTR_NEGEDGE : GPIO_INTR_POSEDGE;
|
||||
idf_type = inverted_ ? GPIO_INTR_NEGEDGE : GPIO_INTR_POSEDGE;
|
||||
break;
|
||||
case gpio::INTERRUPT_FALLING_EDGE:
|
||||
idf_type = pin_flags_.inverted ? GPIO_INTR_POSEDGE : GPIO_INTR_NEGEDGE;
|
||||
idf_type = inverted_ ? GPIO_INTR_POSEDGE : GPIO_INTR_NEGEDGE;
|
||||
break;
|
||||
case gpio::INTERRUPT_ANY_EDGE:
|
||||
idf_type = GPIO_INTR_ANYEDGE;
|
||||
break;
|
||||
case gpio::INTERRUPT_LOW_LEVEL:
|
||||
idf_type = pin_flags_.inverted ? GPIO_INTR_HIGH_LEVEL : GPIO_INTR_LOW_LEVEL;
|
||||
idf_type = inverted_ ? GPIO_INTR_HIGH_LEVEL : GPIO_INTR_LOW_LEVEL;
|
||||
break;
|
||||
case gpio::INTERRUPT_HIGH_LEVEL:
|
||||
idf_type = pin_flags_.inverted ? GPIO_INTR_LOW_LEVEL : GPIO_INTR_HIGH_LEVEL;
|
||||
idf_type = inverted_ ? GPIO_INTR_LOW_LEVEL : GPIO_INTR_HIGH_LEVEL;
|
||||
break;
|
||||
}
|
||||
gpio_set_intr_type(get_pin_num(), idf_type);
|
||||
gpio_intr_enable(get_pin_num());
|
||||
gpio_set_intr_type(pin_, idf_type);
|
||||
gpio_intr_enable(pin_);
|
||||
if (!isr_service_installed) {
|
||||
auto res = gpio_install_isr_service(ESP_INTR_FLAG_LEVEL3);
|
||||
if (res != ESP_OK) {
|
||||
@@ -94,7 +94,7 @@ void ESP32InternalGPIOPin::attach_interrupt(void (*func)(void *), void *arg, gpi
|
||||
}
|
||||
isr_service_installed = true;
|
||||
}
|
||||
gpio_isr_handler_add(get_pin_num(), func, arg);
|
||||
gpio_isr_handler_add(pin_, func, arg);
|
||||
}
|
||||
|
||||
std::string ESP32InternalGPIOPin::dump_summary() const {
|
||||
@@ -112,13 +112,13 @@ void ESP32InternalGPIOPin::setup() {
|
||||
conf.intr_type = GPIO_INTR_DISABLE;
|
||||
gpio_config(&conf);
|
||||
if (flags_ & gpio::FLAG_OUTPUT) {
|
||||
gpio_set_drive_capability(get_pin_num(), get_drive_strength());
|
||||
gpio_set_drive_capability(pin_, drive_strength_);
|
||||
}
|
||||
}
|
||||
|
||||
void ESP32InternalGPIOPin::pin_mode(gpio::Flags flags) {
|
||||
// can't call gpio_config here because that logs in esp-idf which may cause issues
|
||||
gpio_set_direction(get_pin_num(), flags_to_mode(flags));
|
||||
gpio_set_direction(pin_, flags_to_mode(flags));
|
||||
gpio_pull_mode_t pull_mode = GPIO_FLOATING;
|
||||
if ((flags & gpio::FLAG_PULLUP) && (flags & gpio::FLAG_PULLDOWN)) {
|
||||
pull_mode = GPIO_PULLUP_PULLDOWN;
|
||||
@@ -127,14 +127,12 @@ void ESP32InternalGPIOPin::pin_mode(gpio::Flags flags) {
|
||||
} else if (flags & gpio::FLAG_PULLDOWN) {
|
||||
pull_mode = GPIO_PULLDOWN_ONLY;
|
||||
}
|
||||
gpio_set_pull_mode(get_pin_num(), pull_mode);
|
||||
gpio_set_pull_mode(pin_, pull_mode);
|
||||
}
|
||||
|
||||
bool ESP32InternalGPIOPin::digital_read() { return bool(gpio_get_level(get_pin_num())) != pin_flags_.inverted; }
|
||||
void ESP32InternalGPIOPin::digital_write(bool value) {
|
||||
gpio_set_level(get_pin_num(), value != pin_flags_.inverted ? 1 : 0);
|
||||
}
|
||||
void ESP32InternalGPIOPin::detach_interrupt() const { gpio_intr_disable(get_pin_num()); }
|
||||
bool ESP32InternalGPIOPin::digital_read() { return bool(gpio_get_level(pin_)) != inverted_; }
|
||||
void ESP32InternalGPIOPin::digital_write(bool value) { gpio_set_level(pin_, value != inverted_ ? 1 : 0); }
|
||||
void ESP32InternalGPIOPin::detach_interrupt() const { gpio_intr_disable(pin_); }
|
||||
|
||||
} // namespace esp32
|
||||
|
||||
|
@@ -7,17 +7,11 @@
|
||||
namespace esphome {
|
||||
namespace esp32 {
|
||||
|
||||
// Static assertions to ensure our bit-packed fields can hold the enum values
|
||||
static_assert(GPIO_NUM_MAX <= 256, "gpio_num_t has too many values for uint8_t");
|
||||
static_assert(GPIO_DRIVE_CAP_MAX <= 4, "gpio_drive_cap_t has too many values for 2-bit field");
|
||||
|
||||
class ESP32InternalGPIOPin : public InternalGPIOPin {
|
||||
public:
|
||||
void set_pin(gpio_num_t pin) { pin_ = static_cast<uint8_t>(pin); }
|
||||
void set_inverted(bool inverted) { pin_flags_.inverted = inverted; }
|
||||
void set_drive_strength(gpio_drive_cap_t drive_strength) {
|
||||
pin_flags_.drive_strength = static_cast<uint8_t>(drive_strength);
|
||||
}
|
||||
void set_pin(gpio_num_t pin) { pin_ = pin; }
|
||||
void set_inverted(bool inverted) { inverted_ = inverted; }
|
||||
void set_drive_strength(gpio_drive_cap_t drive_strength) { drive_strength_ = drive_strength; }
|
||||
void set_flags(gpio::Flags flags) { flags_ = flags; }
|
||||
|
||||
void setup() override;
|
||||
@@ -27,26 +21,17 @@ class ESP32InternalGPIOPin : public InternalGPIOPin {
|
||||
std::string dump_summary() const override;
|
||||
void detach_interrupt() const override;
|
||||
ISRInternalGPIOPin to_isr() const override;
|
||||
uint8_t get_pin() const override { return pin_; }
|
||||
uint8_t get_pin() const override { return (uint8_t) pin_; }
|
||||
gpio::Flags get_flags() const override { return flags_; }
|
||||
bool is_inverted() const override { return pin_flags_.inverted; }
|
||||
gpio_num_t get_pin_num() const { return static_cast<gpio_num_t>(pin_); }
|
||||
gpio_drive_cap_t get_drive_strength() const { return static_cast<gpio_drive_cap_t>(pin_flags_.drive_strength); }
|
||||
bool is_inverted() const override { return inverted_; }
|
||||
|
||||
protected:
|
||||
void attach_interrupt(void (*func)(void *), void *arg, gpio::InterruptType type) const override;
|
||||
|
||||
// Memory layout: 8 bytes total on 32-bit systems
|
||||
// - 3 bytes for members below
|
||||
// - 1 byte padding for alignment
|
||||
// - 4 bytes for vtable pointer
|
||||
uint8_t pin_; // GPIO pin number (0-255, actual max ~48 on ESP32)
|
||||
gpio::Flags flags_; // GPIO flags (1 byte)
|
||||
struct PinFlags {
|
||||
uint8_t inverted : 1; // Invert pin logic (1 bit)
|
||||
uint8_t drive_strength : 2; // Drive strength 0-3 (2 bits)
|
||||
uint8_t reserved : 5; // Reserved for future use (5 bits)
|
||||
} pin_flags_; // Total: 1 byte
|
||||
gpio_num_t pin_;
|
||||
gpio_drive_cap_t drive_strength_;
|
||||
gpio::Flags flags_;
|
||||
bool inverted_;
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
static bool isr_service_installed;
|
||||
};
|
||||
|
@@ -58,8 +58,8 @@ extern "C" void resetPins() { // NOLINT
|
||||
|
||||
#ifdef USE_ESP8266_EARLY_PIN_INIT
|
||||
for (int i = 0; i < 16; i++) {
|
||||
uint8_t mode = ESPHOME_ESP8266_GPIO_INITIAL_MODE[i];
|
||||
uint8_t level = ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[i];
|
||||
uint8_t mode = progmem_read_byte(&ESPHOME_ESP8266_GPIO_INITIAL_MODE[i]);
|
||||
uint8_t level = progmem_read_byte(&ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[i]);
|
||||
if (mode != 255)
|
||||
pinMode(i, mode); // NOLINT
|
||||
if (level != 255)
|
||||
|
@@ -199,11 +199,11 @@ async def add_pin_initial_states_array():
|
||||
|
||||
cg.add_global(
|
||||
cg.RawExpression(
|
||||
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_MODE[16] = {{{initial_modes_s}}}"
|
||||
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_MODE[16] PROGMEM = {{{initial_modes_s}}}"
|
||||
)
|
||||
)
|
||||
cg.add_global(
|
||||
cg.RawExpression(
|
||||
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[16] = {{{initial_levels_s}}}"
|
||||
f"const uint8_t ESPHOME_ESP8266_GPIO_INITIAL_LEVEL[16] PROGMEM = {{{initial_levels_s}}}"
|
||||
)
|
||||
)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
#ifdef USE_ESP8266
|
||||
|
||||
#include <c_types.h>
|
||||
#include <cinttypes>
|
||||
extern "C" {
|
||||
#include "spi_flash.h"
|
||||
}
|
||||
@@ -119,16 +120,16 @@ static bool load_from_rtc(size_t offset, uint32_t *data, size_t len) {
|
||||
|
||||
class ESP8266PreferenceBackend : public ESPPreferenceBackend {
|
||||
public:
|
||||
size_t offset = 0;
|
||||
uint32_t type = 0;
|
||||
uint16_t offset = 0;
|
||||
uint8_t length_words = 0; // Max 255 words (1020 bytes of data)
|
||||
bool in_flash = false;
|
||||
size_t length_words = 0;
|
||||
|
||||
bool save(const uint8_t *data, size_t len) override {
|
||||
if (bytes_to_words(len) != length_words) {
|
||||
return false;
|
||||
}
|
||||
size_t buffer_size = length_words + 1;
|
||||
size_t buffer_size = static_cast<size_t>(length_words) + 1;
|
||||
std::unique_ptr<uint32_t[]> buffer(new uint32_t[buffer_size]()); // Note the () for zero-initialization
|
||||
memcpy(buffer.get(), data, len);
|
||||
buffer[length_words] = calculate_crc(buffer.get(), buffer.get() + length_words, type);
|
||||
@@ -142,7 +143,7 @@ class ESP8266PreferenceBackend : public ESPPreferenceBackend {
|
||||
if (bytes_to_words(len) != length_words) {
|
||||
return false;
|
||||
}
|
||||
size_t buffer_size = length_words + 1;
|
||||
size_t buffer_size = static_cast<size_t>(length_words) + 1;
|
||||
std::unique_ptr<uint32_t[]> buffer(new uint32_t[buffer_size]());
|
||||
bool ret = in_flash ? load_from_flash(offset, buffer.get(), buffer_size)
|
||||
: load_from_rtc(offset, buffer.get(), buffer_size);
|
||||
@@ -176,15 +177,19 @@ class ESP8266Preferences : public ESPPreferences {
|
||||
|
||||
ESPPreferenceObject make_preference(size_t length, uint32_t type, bool in_flash) override {
|
||||
uint32_t length_words = bytes_to_words(length);
|
||||
if (length_words > 255) {
|
||||
ESP_LOGE(TAG, "Preference too large: %" PRIu32 " words > 255", length_words);
|
||||
return {};
|
||||
}
|
||||
if (in_flash) {
|
||||
uint32_t start = current_flash_offset;
|
||||
uint32_t end = start + length_words + 1;
|
||||
if (end > ESP8266_FLASH_STORAGE_SIZE)
|
||||
return {};
|
||||
auto *pref = new ESP8266PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
|
||||
pref->offset = start;
|
||||
pref->offset = static_cast<uint16_t>(start);
|
||||
pref->type = type;
|
||||
pref->length_words = length_words;
|
||||
pref->length_words = static_cast<uint8_t>(length_words);
|
||||
pref->in_flash = true;
|
||||
current_flash_offset = end;
|
||||
return {pref};
|
||||
@@ -210,9 +215,9 @@ class ESP8266Preferences : public ESPPreferences {
|
||||
uint32_t rtc_offset = in_normal ? start + 32 : start - 96;
|
||||
|
||||
auto *pref = new ESP8266PreferenceBackend(); // NOLINT(cppcoreguidelines-owning-memory)
|
||||
pref->offset = rtc_offset;
|
||||
pref->offset = static_cast<uint16_t>(rtc_offset);
|
||||
pref->type = type;
|
||||
pref->length_words = length_words;
|
||||
pref->length_words = static_cast<uint8_t>(length_words);
|
||||
pref->in_flash = false;
|
||||
current_offset += length_words + 1;
|
||||
return pref;
|
||||
|
@@ -11,17 +11,22 @@ namespace esphome::gpio_expander {
|
||||
/// @brief A class to cache the read state of a GPIO expander.
|
||||
/// This class caches reads between GPIO Pins which are on the same bank.
|
||||
/// This means that for reading whole Port (ex. 8 pins) component needs only one
|
||||
/// I2C/SPI read per main loop call. It assumes, that one bit in byte identifies one GPIO pin
|
||||
/// I2C/SPI read per main loop call. It assumes that one bit in byte identifies one GPIO pin.
|
||||
///
|
||||
/// Template parameters:
|
||||
/// T - Type which represents internal register. Could be uint8_t or uint16_t. Adjust to
|
||||
/// match size of your internal GPIO bank register.
|
||||
/// N - Number of pins
|
||||
template<typename T, T N> class CachedGpioExpander {
|
||||
/// T - Type which represents internal bank register. Could be uint8_t or uint16_t.
|
||||
/// Choose based on how your I/O expander reads pins:
|
||||
/// * uint8_t: For chips that read banks separately (8 pins at a time)
|
||||
/// Examples: MCP23017 (2x8-bit banks), TCA9555 (2x8-bit banks)
|
||||
/// * uint16_t: For chips that read all pins at once (up to 16 pins)
|
||||
/// Examples: PCF8574/8575 (8/16 pins), PCA9554/9555 (8/16 pins)
|
||||
/// N - Total number of pins (as uint8_t)
|
||||
template<typename T, uint8_t N> class CachedGpioExpander {
|
||||
public:
|
||||
/// @brief Read the state of the given pin. This will invalidate the cache for the given pin number.
|
||||
/// @param pin Pin number to read
|
||||
/// @return Pin state
|
||||
bool digital_read(T pin) {
|
||||
bool digital_read(uint8_t pin) {
|
||||
const uint8_t bank = pin / BANK_SIZE;
|
||||
const T pin_mask = (1 << (pin % BANK_SIZE));
|
||||
// Check if specific pin cache is valid
|
||||
@@ -38,15 +43,25 @@ template<typename T, T N> class CachedGpioExpander {
|
||||
return this->digital_read_cache(pin);
|
||||
}
|
||||
|
||||
void digital_write(T pin, bool value) { this->digital_write_hw(pin, value); }
|
||||
void digital_write(uint8_t pin, bool value) { this->digital_write_hw(pin, value); }
|
||||
|
||||
protected:
|
||||
/// @brief Call component low level function to read GPIO state from device
|
||||
virtual bool digital_read_hw(T pin) = 0;
|
||||
/// @brief Call component read function from internal cache.
|
||||
virtual bool digital_read_cache(T pin) = 0;
|
||||
/// @brief Call component low level function to write GPIO state to device
|
||||
virtual void digital_write_hw(T pin, bool value) = 0;
|
||||
/// @brief Read GPIO bank from hardware into internal state
|
||||
/// @param pin Pin number (used to determine which bank to read)
|
||||
/// @return true if read succeeded, false on communication error
|
||||
/// @note This does NOT return the pin state. It returns whether the read operation succeeded.
|
||||
/// The actual pin state should be returned by digital_read_cache().
|
||||
virtual bool digital_read_hw(uint8_t pin) = 0;
|
||||
|
||||
/// @brief Get cached pin value from internal state
|
||||
/// @param pin Pin number to read
|
||||
/// @return Pin state (true = HIGH, false = LOW)
|
||||
virtual bool digital_read_cache(uint8_t pin) = 0;
|
||||
|
||||
/// @brief Write GPIO state to hardware
|
||||
/// @param pin Pin number to write
|
||||
/// @param value Pin state to write (true = HIGH, false = LOW)
|
||||
virtual void digital_write_hw(uint8_t pin, bool value) = 0;
|
||||
|
||||
/// @brief Invalidate cache. This function should be called in component loop().
|
||||
void reset_pin_cache_() { memset(this->read_cache_valid_, 0x00, CACHE_SIZE_BYTES); }
|
||||
|
@@ -11,6 +11,7 @@ from esphome.const import (
|
||||
CONF_OUTPUT,
|
||||
)
|
||||
|
||||
AUTO_LOAD = ["gpio_expander"]
|
||||
DEPENDENCIES = ["i2c"]
|
||||
MULTI_CONF = True
|
||||
|
||||
|
@@ -16,6 +16,10 @@ void PCF8574Component::setup() {
|
||||
this->write_gpio_();
|
||||
this->read_gpio_();
|
||||
}
|
||||
void PCF8574Component::loop() {
|
||||
// Invalidate the cache at the start of each loop
|
||||
this->reset_pin_cache_();
|
||||
}
|
||||
void PCF8574Component::dump_config() {
|
||||
ESP_LOGCONFIG(TAG, "PCF8574:");
|
||||
LOG_I2C_DEVICE(this)
|
||||
@@ -24,17 +28,19 @@ void PCF8574Component::dump_config() {
|
||||
ESP_LOGE(TAG, ESP_LOG_MSG_COMM_FAIL);
|
||||
}
|
||||
}
|
||||
bool PCF8574Component::digital_read(uint8_t pin) {
|
||||
this->read_gpio_();
|
||||
return this->input_mask_ & (1 << pin);
|
||||
bool PCF8574Component::digital_read_hw(uint8_t pin) {
|
||||
// Read all pins from hardware into input_mask_
|
||||
return this->read_gpio_(); // Return true if I2C read succeeded, false on error
|
||||
}
|
||||
void PCF8574Component::digital_write(uint8_t pin, bool value) {
|
||||
|
||||
bool PCF8574Component::digital_read_cache(uint8_t pin) { return this->input_mask_ & (1 << pin); }
|
||||
|
||||
void PCF8574Component::digital_write_hw(uint8_t pin, bool value) {
|
||||
if (value) {
|
||||
this->output_mask_ |= (1 << pin);
|
||||
} else {
|
||||
this->output_mask_ &= ~(1 << pin);
|
||||
}
|
||||
|
||||
this->write_gpio_();
|
||||
}
|
||||
void PCF8574Component::pin_mode(uint8_t pin, gpio::Flags flags) {
|
||||
@@ -91,6 +97,9 @@ bool PCF8574Component::write_gpio_() {
|
||||
}
|
||||
float PCF8574Component::get_setup_priority() const { return setup_priority::IO; }
|
||||
|
||||
// Run our loop() method early to invalidate cache before any other components access the pins
|
||||
float PCF8574Component::get_loop_priority() const { return 9.0f; } // Just after WIFI
|
||||
|
||||
void PCF8574GPIOPin::setup() { pin_mode(flags_); }
|
||||
void PCF8574GPIOPin::pin_mode(gpio::Flags flags) { this->parent_->pin_mode(this->pin_, flags); }
|
||||
bool PCF8574GPIOPin::digital_read() { return this->parent_->digital_read(this->pin_) != this->inverted_; }
|
||||
|
@@ -3,11 +3,16 @@
|
||||
#include "esphome/core/component.h"
|
||||
#include "esphome/core/hal.h"
|
||||
#include "esphome/components/i2c/i2c.h"
|
||||
#include "esphome/components/gpio_expander/cached_gpio.h"
|
||||
|
||||
namespace esphome {
|
||||
namespace pcf8574 {
|
||||
|
||||
class PCF8574Component : public Component, public i2c::I2CDevice {
|
||||
// PCF8574(8 pins)/PCF8575(16 pins) always read/write all pins in a single I2C transaction
|
||||
// so we use uint16_t as bank type to ensure all pins are in one bank and cached together
|
||||
class PCF8574Component : public Component,
|
||||
public i2c::I2CDevice,
|
||||
public gpio_expander::CachedGpioExpander<uint16_t, 16> {
|
||||
public:
|
||||
PCF8574Component() = default;
|
||||
|
||||
@@ -15,20 +20,22 @@ class PCF8574Component : public Component, public i2c::I2CDevice {
|
||||
|
||||
/// Check i2c availability and setup masks
|
||||
void setup() override;
|
||||
/// Helper function to read the value of a pin.
|
||||
bool digital_read(uint8_t pin);
|
||||
/// Helper function to write the value of a pin.
|
||||
void digital_write(uint8_t pin, bool value);
|
||||
/// Invalidate cache at start of each loop
|
||||
void loop() override;
|
||||
/// Helper function to set the pin mode of a pin.
|
||||
void pin_mode(uint8_t pin, gpio::Flags flags);
|
||||
|
||||
float get_setup_priority() const override;
|
||||
float get_loop_priority() const override;
|
||||
|
||||
void dump_config() override;
|
||||
|
||||
protected:
|
||||
bool read_gpio_();
|
||||
bool digital_read_hw(uint8_t pin) override;
|
||||
bool digital_read_cache(uint8_t pin) override;
|
||||
void digital_write_hw(uint8_t pin, bool value) override;
|
||||
|
||||
bool read_gpio_();
|
||||
bool write_gpio_();
|
||||
|
||||
/// Mask for the pin mode - 1 means output, 0 means input
|
||||
|
@@ -2,6 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import binascii
|
||||
from collections.abc import Callable, Iterable
|
||||
import datetime
|
||||
import functools
|
||||
@@ -490,7 +491,17 @@ class WizardRequestHandler(BaseHandler):
|
||||
kwargs = {
|
||||
k: v
|
||||
for k, v in json.loads(self.request.body.decode()).items()
|
||||
if k in ("name", "platform", "board", "ssid", "psk", "password")
|
||||
if k
|
||||
in (
|
||||
"type",
|
||||
"name",
|
||||
"platform",
|
||||
"board",
|
||||
"ssid",
|
||||
"psk",
|
||||
"password",
|
||||
"file_content",
|
||||
)
|
||||
}
|
||||
if not kwargs["name"]:
|
||||
self.set_status(422)
|
||||
@@ -498,19 +509,65 @@ class WizardRequestHandler(BaseHandler):
|
||||
self.write(json.dumps({"error": "Name is required"}))
|
||||
return
|
||||
|
||||
if "type" not in kwargs:
|
||||
# Default to basic wizard type for backwards compatibility
|
||||
kwargs["type"] = "basic"
|
||||
|
||||
kwargs["friendly_name"] = kwargs["name"]
|
||||
kwargs["name"] = friendly_name_slugify(kwargs["friendly_name"])
|
||||
|
||||
kwargs["ota_password"] = secrets.token_hex(16)
|
||||
noise_psk = secrets.token_bytes(32)
|
||||
kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode()
|
||||
if kwargs["type"] == "basic":
|
||||
kwargs["ota_password"] = secrets.token_hex(16)
|
||||
noise_psk = secrets.token_bytes(32)
|
||||
kwargs["api_encryption_key"] = base64.b64encode(noise_psk).decode()
|
||||
elif kwargs["type"] == "upload":
|
||||
try:
|
||||
kwargs["file_text"] = base64.b64decode(kwargs["file_content"]).decode(
|
||||
"utf-8"
|
||||
)
|
||||
except (binascii.Error, UnicodeDecodeError):
|
||||
self.set_status(422)
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(
|
||||
json.dumps({"error": "The uploaded file is not correctly encoded."})
|
||||
)
|
||||
return
|
||||
elif kwargs["type"] != "empty":
|
||||
self.set_status(422)
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(
|
||||
json.dumps(
|
||||
{"error": f"Invalid wizard type specified: {kwargs['type']}"}
|
||||
)
|
||||
)
|
||||
return
|
||||
filename = f"{kwargs['name']}.yaml"
|
||||
destination = settings.rel_path(filename)
|
||||
wizard.wizard_write(path=destination, **kwargs)
|
||||
self.set_status(200)
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(json.dumps({"configuration": filename}))
|
||||
self.finish()
|
||||
|
||||
# Check if destination file already exists
|
||||
if os.path.exists(destination):
|
||||
self.set_status(409) # Conflict status code
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(
|
||||
json.dumps({"error": f"Configuration file '{filename}' already exists"})
|
||||
)
|
||||
self.finish()
|
||||
return
|
||||
|
||||
success = wizard.wizard_write(path=destination, **kwargs)
|
||||
if success:
|
||||
self.set_status(200)
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(json.dumps({"configuration": filename}))
|
||||
self.finish()
|
||||
else:
|
||||
self.set_status(500)
|
||||
self.set_header("content-type", "application/json")
|
||||
self.write(
|
||||
json.dumps(
|
||||
{"error": "Failed to write configuration, see logs for details"}
|
||||
)
|
||||
)
|
||||
self.finish()
|
||||
|
||||
|
||||
class ImportRequestHandler(BaseHandler):
|
||||
|
@@ -189,32 +189,45 @@ def wizard_write(path, **kwargs):
|
||||
from esphome.components.rtl87xx import boards as rtl87xx_boards
|
||||
|
||||
name = kwargs["name"]
|
||||
board = kwargs["board"]
|
||||
if kwargs["type"] == "empty":
|
||||
file_text = ""
|
||||
# Will be updated later after editing the file
|
||||
hardware = "UNKNOWN"
|
||||
elif kwargs["type"] == "upload":
|
||||
file_text = kwargs["file_text"]
|
||||
hardware = "UNKNOWN"
|
||||
else: # "basic"
|
||||
board = kwargs["board"]
|
||||
|
||||
for key in ("ssid", "psk", "password", "ota_password"):
|
||||
if key in kwargs:
|
||||
kwargs[key] = sanitize_double_quotes(kwargs[key])
|
||||
for key in ("ssid", "psk", "password", "ota_password"):
|
||||
if key in kwargs:
|
||||
kwargs[key] = sanitize_double_quotes(kwargs[key])
|
||||
if "platform" not in kwargs:
|
||||
if board in esp8266_boards.BOARDS:
|
||||
platform = "ESP8266"
|
||||
elif board in esp32_boards.BOARDS:
|
||||
platform = "ESP32"
|
||||
elif board in rp2040_boards.BOARDS:
|
||||
platform = "RP2040"
|
||||
elif board in bk72xx_boards.BOARDS:
|
||||
platform = "BK72XX"
|
||||
elif board in ln882x_boards.BOARDS:
|
||||
platform = "LN882X"
|
||||
elif board in rtl87xx_boards.BOARDS:
|
||||
platform = "RTL87XX"
|
||||
else:
|
||||
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
|
||||
return False
|
||||
kwargs["platform"] = platform
|
||||
hardware = kwargs["platform"]
|
||||
file_text = wizard_file(**kwargs)
|
||||
|
||||
if "platform" not in kwargs:
|
||||
if board in esp8266_boards.BOARDS:
|
||||
platform = "ESP8266"
|
||||
elif board in esp32_boards.BOARDS:
|
||||
platform = "ESP32"
|
||||
elif board in rp2040_boards.BOARDS:
|
||||
platform = "RP2040"
|
||||
elif board in bk72xx_boards.BOARDS:
|
||||
platform = "BK72XX"
|
||||
elif board in ln882x_boards.BOARDS:
|
||||
platform = "LN882X"
|
||||
elif board in rtl87xx_boards.BOARDS:
|
||||
platform = "RTL87XX"
|
||||
else:
|
||||
safe_print(color(AnsiFore.RED, f'The board "{board}" is unknown.'))
|
||||
return False
|
||||
kwargs["platform"] = platform
|
||||
hardware = kwargs["platform"]
|
||||
# Check if file already exists to prevent overwriting
|
||||
if os.path.exists(path) and os.path.isfile(path):
|
||||
safe_print(color(AnsiFore.RED, f'The file "{path}" already exists.'))
|
||||
return False
|
||||
|
||||
write_file(path, wizard_file(**kwargs))
|
||||
write_file(path, file_text)
|
||||
storage = StorageJSON.from_wizard(name, name, f"{name}.local", hardware)
|
||||
storage_path = ext_storage_path(os.path.basename(path))
|
||||
storage.save(storage_path)
|
||||
|
@@ -27,11 +27,13 @@ void GPIOExpanderTestComponent::setup() {
|
||||
|
||||
bool GPIOExpanderTestComponent::digital_read_hw(uint8_t pin) {
|
||||
ESP_LOGD(TAG, "digital_read_hw pin=%d", pin);
|
||||
// Return true to indicate successful read operation
|
||||
return true;
|
||||
}
|
||||
|
||||
bool GPIOExpanderTestComponent::digital_read_cache(uint8_t pin) {
|
||||
ESP_LOGD(TAG, "digital_read_cache pin=%d", pin);
|
||||
// Return the pin state (always HIGH for testing)
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,24 @@
|
||||
import esphome.codegen as cg
|
||||
import esphome.config_validation as cv
|
||||
from esphome.const import CONF_ID
|
||||
|
||||
AUTO_LOAD = ["gpio_expander"]
|
||||
|
||||
gpio_expander_test_component_uint16_ns = cg.esphome_ns.namespace(
|
||||
"gpio_expander_test_component_uint16"
|
||||
)
|
||||
|
||||
GPIOExpanderTestUint16Component = gpio_expander_test_component_uint16_ns.class_(
|
||||
"GPIOExpanderTestUint16Component", cg.Component
|
||||
)
|
||||
|
||||
CONFIG_SCHEMA = cv.Schema(
|
||||
{
|
||||
cv.GenerateID(): cv.declare_id(GPIOExpanderTestUint16Component),
|
||||
}
|
||||
).extend(cv.COMPONENT_SCHEMA)
|
||||
|
||||
|
||||
async def to_code(config):
|
||||
var = cg.new_Pvariable(config[CONF_ID])
|
||||
await cg.register_component(var, config)
|
@@ -0,0 +1,43 @@
|
||||
#include "gpio_expander_test_component_uint16.h"
|
||||
#include "esphome/core/log.h"
|
||||
|
||||
namespace esphome::gpio_expander_test_component_uint16 {
|
||||
|
||||
static const char *const TAG = "gpio_expander_test_uint16";
|
||||
|
||||
void GPIOExpanderTestUint16Component::setup() {
|
||||
ESP_LOGD(TAG, "Testing uint16_t bank (single 16-pin bank)");
|
||||
|
||||
// Test reading all 16 pins - first should trigger hw read, rest use cache
|
||||
for (uint8_t pin = 0; pin < 16; pin++) {
|
||||
this->digital_read(pin);
|
||||
}
|
||||
|
||||
// Reset cache and test specific reads
|
||||
ESP_LOGD(TAG, "Resetting cache for uint16_t test");
|
||||
this->reset_pin_cache_();
|
||||
|
||||
// First read triggers hw for entire bank
|
||||
this->digital_read(5);
|
||||
// These should all use cache since they're in the same bank
|
||||
this->digital_read(10);
|
||||
this->digital_read(15);
|
||||
this->digital_read(0);
|
||||
|
||||
ESP_LOGD(TAG, "DONE_UINT16");
|
||||
}
|
||||
|
||||
bool GPIOExpanderTestUint16Component::digital_read_hw(uint8_t pin) {
|
||||
ESP_LOGD(TAG, "uint16_digital_read_hw pin=%d", pin);
|
||||
// In a real component, this would read from I2C/SPI into internal state
|
||||
// For testing, we just return true to indicate successful read
|
||||
return true; // Return true to indicate successful read
|
||||
}
|
||||
|
||||
bool GPIOExpanderTestUint16Component::digital_read_cache(uint8_t pin) {
|
||||
ESP_LOGD(TAG, "uint16_digital_read_cache pin=%d", pin);
|
||||
// Return the actual pin state from our test pattern
|
||||
return (this->test_state_ >> pin) & 1;
|
||||
}
|
||||
|
||||
} // namespace esphome::gpio_expander_test_component_uint16
|
@@ -0,0 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include "esphome/components/gpio_expander/cached_gpio.h"
|
||||
#include "esphome/core/component.h"
|
||||
|
||||
namespace esphome::gpio_expander_test_component_uint16 {
|
||||
|
||||
// Test component using uint16_t bank type (single 16-pin bank)
|
||||
class GPIOExpanderTestUint16Component : public Component,
|
||||
public esphome::gpio_expander::CachedGpioExpander<uint16_t, 16> {
|
||||
public:
|
||||
void setup() override;
|
||||
|
||||
protected:
|
||||
bool digital_read_hw(uint8_t pin) override;
|
||||
bool digital_read_cache(uint8_t pin) override;
|
||||
void digital_write_hw(uint8_t pin, bool value) override{};
|
||||
|
||||
private:
|
||||
uint16_t test_state_{0xAAAA}; // Test pattern: alternating bits
|
||||
};
|
||||
|
||||
} // namespace esphome::gpio_expander_test_component_uint16
|
@@ -12,6 +12,10 @@ external_components:
|
||||
- source:
|
||||
type: local
|
||||
path: EXTERNAL_COMPONENT_PATH
|
||||
components: [gpio_expander_test_component]
|
||||
components: [gpio_expander_test_component, gpio_expander_test_component_uint16]
|
||||
|
||||
# Test with uint8_t (multiple banks)
|
||||
gpio_expander_test_component:
|
||||
|
||||
# Test with uint16_t (single bank)
|
||||
gpio_expander_test_component_uint16:
|
||||
|
@@ -30,9 +30,15 @@ async def test_gpio_expander_cache(
|
||||
|
||||
logs_done = asyncio.Event()
|
||||
|
||||
# Patterns to match in logs
|
||||
digital_read_hw_pattern = re.compile(r"digital_read_hw pin=(\d+)")
|
||||
digital_read_cache_pattern = re.compile(r"digital_read_cache pin=(\d+)")
|
||||
# Patterns to match in logs - match any variation of digital_read
|
||||
read_hw_pattern = re.compile(r"(?:uint16_)?digital_read_hw pin=(\d+)")
|
||||
read_cache_pattern = re.compile(r"(?:uint16_)?digital_read_cache pin=(\d+)")
|
||||
|
||||
# Keep specific patterns for building the expected order
|
||||
digital_read_hw_pattern = re.compile(r"^digital_read_hw pin=(\d+)")
|
||||
digital_read_cache_pattern = re.compile(r"^digital_read_cache pin=(\d+)")
|
||||
uint16_read_hw_pattern = re.compile(r"^uint16_digital_read_hw pin=(\d+)")
|
||||
uint16_read_cache_pattern = re.compile(r"^uint16_digital_read_cache pin=(\d+)")
|
||||
|
||||
# ensure logs are in the expected order
|
||||
log_order = [
|
||||
@@ -59,6 +65,17 @@ async def test_gpio_expander_cache(
|
||||
(digital_read_cache_pattern, 14),
|
||||
(digital_read_hw_pattern, 14),
|
||||
(digital_read_cache_pattern, 14),
|
||||
# uint16_t component tests (single bank of 16 pins)
|
||||
(uint16_read_hw_pattern, 0), # First pin triggers hw read
|
||||
[
|
||||
(uint16_read_cache_pattern, i) for i in range(0, 16)
|
||||
], # All 16 pins return via cache
|
||||
# After cache reset
|
||||
(uint16_read_hw_pattern, 5), # First read after reset triggers hw
|
||||
(uint16_read_cache_pattern, 5),
|
||||
(uint16_read_cache_pattern, 10), # These use cache (same bank)
|
||||
(uint16_read_cache_pattern, 15),
|
||||
(uint16_read_cache_pattern, 0),
|
||||
]
|
||||
# Flatten the log order for easier processing
|
||||
log_order: list[tuple[re.Pattern, int]] = [
|
||||
@@ -77,17 +94,22 @@ async def test_gpio_expander_cache(
|
||||
|
||||
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
|
||||
|
||||
if "digital_read" in clean_line:
|
||||
# Extract just the log message part (after the log level)
|
||||
msg = clean_line.split(": ", 1)[-1] if ": " in clean_line else clean_line
|
||||
|
||||
# Check if this line contains a read operation we're tracking
|
||||
if read_hw_pattern.search(msg) or read_cache_pattern.search(msg):
|
||||
if index >= len(log_order):
|
||||
print(f"Received unexpected log line: {clean_line}")
|
||||
print(f"Received unexpected log line: {msg}")
|
||||
logs_done.set()
|
||||
return
|
||||
|
||||
pattern, expected_pin = log_order[index]
|
||||
match = pattern.search(clean_line)
|
||||
match = pattern.search(msg)
|
||||
|
||||
if not match:
|
||||
print(f"Log line did not match next expected pattern: {clean_line}")
|
||||
print(f"Log line did not match next expected pattern: {msg}")
|
||||
print(f"Expected pattern: {pattern.pattern}")
|
||||
logs_done.set()
|
||||
return
|
||||
|
||||
@@ -99,9 +121,10 @@ async def test_gpio_expander_cache(
|
||||
|
||||
index += 1
|
||||
|
||||
elif "DONE" in clean_line:
|
||||
# Check if we reached the end of the expected log entries
|
||||
logs_done.set()
|
||||
elif "DONE_UINT16" in clean_line:
|
||||
# uint16 component is done, check if we've seen all expected logs
|
||||
if index == len(log_order):
|
||||
logs_done.set()
|
||||
|
||||
# Run with log monitoring
|
||||
async with (
|
||||
|
@@ -17,6 +17,7 @@ import esphome.wizard as wz
|
||||
@pytest.fixture
|
||||
def default_config():
|
||||
return {
|
||||
"type": "basic",
|
||||
"name": "test-name",
|
||||
"platform": "ESP8266",
|
||||
"board": "esp01_1m",
|
||||
@@ -125,6 +126,47 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
|
||||
assert "esp8266:" in generated_config
|
||||
|
||||
|
||||
def test_wizard_empty_config(tmp_path, monkeypatch):
|
||||
"""
|
||||
The wizard should be able to create an empty configuration
|
||||
"""
|
||||
# Given
|
||||
empty_config = {
|
||||
"type": "empty",
|
||||
"name": "test-empty",
|
||||
}
|
||||
monkeypatch.setattr(wz, "write_file", MagicMock())
|
||||
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
|
||||
|
||||
# When
|
||||
wz.wizard_write(tmp_path, **empty_config)
|
||||
|
||||
# Then
|
||||
generated_config = wz.write_file.call_args.args[1]
|
||||
assert generated_config == ""
|
||||
|
||||
|
||||
def test_wizard_upload_config(tmp_path, monkeypatch):
|
||||
"""
|
||||
The wizard should be able to import an base64 encoded configuration
|
||||
"""
|
||||
# Given
|
||||
empty_config = {
|
||||
"type": "upload",
|
||||
"name": "test-upload",
|
||||
"file_text": "# imported file 📁\n\n",
|
||||
}
|
||||
monkeypatch.setattr(wz, "write_file", MagicMock())
|
||||
monkeypatch.setattr(CORE, "config_path", os.path.dirname(tmp_path))
|
||||
|
||||
# When
|
||||
wz.wizard_write(tmp_path, **empty_config)
|
||||
|
||||
# Then
|
||||
generated_config = wz.write_file.call_args.args[1]
|
||||
assert generated_config == "# imported file 📁\n\n"
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_esp8266(
|
||||
default_config, tmp_path, monkeypatch
|
||||
):
|
||||
@@ -471,3 +513,22 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
|
||||
|
||||
# Then
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
|
||||
"""
|
||||
The wizard_write function should not overwrite existing config files and return False
|
||||
"""
|
||||
# Given
|
||||
config_file = tmpdir.join("test.yaml")
|
||||
original_content = "# Original config content\n"
|
||||
config_file.write(original_content)
|
||||
|
||||
monkeypatch.setattr(CORE, "config_path", str(tmpdir))
|
||||
|
||||
# When
|
||||
result = wz.wizard_write(str(config_file), **default_config)
|
||||
|
||||
# Then
|
||||
assert result is False # Should return False when file exists
|
||||
assert config_file.read() == original_content
|
||||
|
Reference in New Issue
Block a user