Merge remote-tracking branch 'upstream/dev' into integration

This commit is contained in:
J. Nick Koston 2025-07-08 09:08:03 -06:00
commit 591786a787
No known key found for this signature in database
28 changed files with 921 additions and 359 deletions

View File

@ -2,6 +2,7 @@
CODEOWNERS = ["@esphome/core"] CODEOWNERS = ["@esphome/core"]
CONF_BYTE_ORDER = "byte_order"
CONF_DRAW_ROUNDING = "draw_rounding" CONF_DRAW_ROUNDING = "draw_rounding"
CONF_ON_STATE_CHANGE = "on_state_change" CONF_ON_STATE_CHANGE = "on_state_change"
CONF_REQUEST_HEADERS = "request_headers" CONF_REQUEST_HEADERS = "request_headers"

View File

@ -111,8 +111,8 @@ CONFIG_SCHEMA = cv.All(
cv.Optional(CONF_MOISTURE): sensor.sensor_schema( cv.Optional(CONF_MOISTURE): sensor.sensor_schema(
unit_of_measurement=UNIT_INTENSITY, unit_of_measurement=UNIT_INTENSITY,
accuracy_decimals=0, accuracy_decimals=0,
device_class=DEVICE_CLASS_PRECIPITATION_INTENSITY,
state_class=STATE_CLASS_MEASUREMENT, state_class=STATE_CLASS_MEASUREMENT,
icon="mdi:weather-rainy",
), ),
cv.Optional(CONF_TEMPERATURE): sensor.sensor_schema( cv.Optional(CONF_TEMPERATURE): sensor.sensor_schema(
unit_of_measurement=UNIT_CELSIUS, unit_of_measurement=UNIT_CELSIUS,

View File

@ -10,8 +10,10 @@ from PIL import Image, UnidentifiedImageError
from esphome import core, external_files from esphome import core, external_files
import esphome.codegen as cg import esphome.codegen as cg
from esphome.components.const import CONF_BYTE_ORDER
import esphome.config_validation as cv import esphome.config_validation as cv
from esphome.const import ( from esphome.const import (
CONF_DEFAULTS,
CONF_DITHER, CONF_DITHER,
CONF_FILE, CONF_FILE,
CONF_ICON, CONF_ICON,
@ -38,6 +40,7 @@ CONF_OPAQUE = "opaque"
CONF_CHROMA_KEY = "chroma_key" CONF_CHROMA_KEY = "chroma_key"
CONF_ALPHA_CHANNEL = "alpha_channel" CONF_ALPHA_CHANNEL = "alpha_channel"
CONF_INVERT_ALPHA = "invert_alpha" CONF_INVERT_ALPHA = "invert_alpha"
CONF_IMAGES = "images"
TRANSPARENCY_TYPES = ( TRANSPARENCY_TYPES = (
CONF_OPAQUE, CONF_OPAQUE,
@ -188,6 +191,10 @@ class ImageRGB565(ImageEncoder):
dither, dither,
invert_alpha, invert_alpha,
) )
self.big_endian = True
def set_big_endian(self, big_endian: bool) -> None:
self.big_endian = big_endian
def convert(self, image, path): def convert(self, image, path):
return image.convert("RGBA") return image.convert("RGBA")
@ -205,10 +212,16 @@ class ImageRGB565(ImageEncoder):
g = 1 g = 1
b = 0 b = 0
rgb = (r << 11) | (g << 5) | b rgb = (r << 11) | (g << 5) | b
self.data[self.index] = rgb >> 8 if self.big_endian:
self.index += 1 self.data[self.index] = rgb >> 8
self.data[self.index] = rgb & 0xFF self.index += 1
self.index += 1 self.data[self.index] = rgb & 0xFF
self.index += 1
else:
self.data[self.index] = rgb & 0xFF
self.index += 1
self.data[self.index] = rgb >> 8
self.index += 1
if self.transparency == CONF_ALPHA_CHANNEL: if self.transparency == CONF_ALPHA_CHANNEL:
if self.invert_alpha: if self.invert_alpha:
a ^= 0xFF a ^= 0xFF
@ -364,7 +377,7 @@ def validate_file_shorthand(value):
value = cv.string_strict(value) value = cv.string_strict(value)
parts = value.strip().split(":") parts = value.strip().split(":")
if len(parts) == 2 and parts[0] in MDI_SOURCES: if len(parts) == 2 and parts[0] in MDI_SOURCES:
match = re.match(r"[a-zA-Z0-9\-]+", parts[1]) match = re.match(r"^[a-zA-Z0-9\-]+$", parts[1])
if match is None: if match is None:
raise cv.Invalid(f"Could not parse mdi icon name from '{value}'.") raise cv.Invalid(f"Could not parse mdi icon name from '{value}'.")
return download_gh_svg(parts[1], parts[0]) return download_gh_svg(parts[1], parts[0])
@ -434,20 +447,29 @@ def validate_type(image_types):
def validate_settings(value): def validate_settings(value):
type = value[CONF_TYPE] """
Validate the settings for a single image configuration.
"""
conf_type = value[CONF_TYPE]
type_class = IMAGE_TYPE[conf_type]
transparency = value[CONF_TRANSPARENCY].lower() transparency = value[CONF_TRANSPARENCY].lower()
allow_config = IMAGE_TYPE[type].allow_config if transparency not in type_class.allow_config:
if transparency not in allow_config:
raise cv.Invalid( raise cv.Invalid(
f"Image format '{type}' cannot have transparency: {transparency}" f"Image format '{conf_type}' cannot have transparency: {transparency}"
) )
invert_alpha = value.get(CONF_INVERT_ALPHA, False) invert_alpha = value.get(CONF_INVERT_ALPHA, False)
if ( if (
invert_alpha invert_alpha
and transparency != CONF_ALPHA_CHANNEL and transparency != CONF_ALPHA_CHANNEL
and CONF_INVERT_ALPHA not in allow_config and CONF_INVERT_ALPHA not in type_class.allow_config
): ):
raise cv.Invalid("No alpha channel to invert") raise cv.Invalid("No alpha channel to invert")
if value.get(CONF_BYTE_ORDER) is not None and not callable(
getattr(type_class, "set_big_endian", None)
):
raise cv.Invalid(
f"Image format '{conf_type}' does not support byte order configuration"
)
if file := value.get(CONF_FILE): if file := value.get(CONF_FILE):
file = Path(file) file = Path(file)
if is_svg_file(file): if is_svg_file(file):
@ -456,31 +478,82 @@ def validate_settings(value):
try: try:
Image.open(file) Image.open(file)
except UnidentifiedImageError as exc: except UnidentifiedImageError as exc:
raise cv.Invalid(f"File can't be opened as image: {file}") from exc raise cv.Invalid(
f"File can't be opened as image: {file.absolute()}"
) from exc
return value return value
IMAGE_ID_SCHEMA = {
cv.Required(CONF_ID): cv.declare_id(Image_),
cv.Required(CONF_FILE): cv.Any(validate_file_shorthand, TYPED_FILE_SCHEMA),
cv.GenerateID(CONF_RAW_DATA_ID): cv.declare_id(cg.uint8),
}
OPTIONS_SCHEMA = {
cv.Optional(CONF_RESIZE): cv.dimensions,
cv.Optional(CONF_DITHER, default="NONE"): cv.one_of(
"NONE", "FLOYDSTEINBERG", upper=True
),
cv.Optional(CONF_INVERT_ALPHA, default=False): cv.boolean,
cv.Optional(CONF_BYTE_ORDER): cv.one_of("BIG_ENDIAN", "LITTLE_ENDIAN", upper=True),
cv.Optional(CONF_TRANSPARENCY, default=CONF_OPAQUE): validate_transparency(),
cv.Optional(CONF_TYPE): validate_type(IMAGE_TYPE),
}
OPTIONS = [key.schema for key in OPTIONS_SCHEMA]
# image schema with no defaults, used with `CONF_IMAGES` in the config
IMAGE_SCHEMA_NO_DEFAULTS = {
**IMAGE_ID_SCHEMA,
**{cv.Optional(key): OPTIONS_SCHEMA[key] for key in OPTIONS},
}
BASE_SCHEMA = cv.Schema( BASE_SCHEMA = cv.Schema(
{ {
cv.Required(CONF_ID): cv.declare_id(Image_), **IMAGE_ID_SCHEMA,
cv.Required(CONF_FILE): cv.Any(validate_file_shorthand, TYPED_FILE_SCHEMA), **OPTIONS_SCHEMA,
cv.Optional(CONF_RESIZE): cv.dimensions,
cv.Optional(CONF_DITHER, default="NONE"): cv.one_of(
"NONE", "FLOYDSTEINBERG", upper=True
),
cv.Optional(CONF_INVERT_ALPHA, default=False): cv.boolean,
cv.GenerateID(CONF_RAW_DATA_ID): cv.declare_id(cg.uint8),
} }
).add_extra(validate_settings) ).add_extra(validate_settings)
IMAGE_SCHEMA = BASE_SCHEMA.extend( IMAGE_SCHEMA = BASE_SCHEMA.extend(
{ {
cv.Required(CONF_TYPE): validate_type(IMAGE_TYPE), cv.Required(CONF_TYPE): validate_type(IMAGE_TYPE),
cv.Optional(CONF_TRANSPARENCY, default=CONF_OPAQUE): validate_transparency(),
} }
) )
def validate_defaults(value):
"""
Validate the options for images with defaults
"""
defaults = value[CONF_DEFAULTS]
result = []
for index, image in enumerate(value[CONF_IMAGES]):
type = image.get(CONF_TYPE, defaults.get(CONF_TYPE))
if type is None:
raise cv.Invalid(
"Type is required either in the image config or in the defaults",
path=[CONF_IMAGES, index],
)
type_class = IMAGE_TYPE[type]
# A default byte order should be simply ignored if the type does not support it
available_options = [*OPTIONS]
if (
not callable(getattr(type_class, "set_big_endian", None))
and CONF_BYTE_ORDER not in image
):
available_options.remove(CONF_BYTE_ORDER)
config = {
**{key: image.get(key, defaults.get(key)) for key in available_options},
**{key.schema: image[key.schema] for key in IMAGE_ID_SCHEMA},
}
validate_settings(config)
result.append(config)
return result
def typed_image_schema(image_type): def typed_image_schema(image_type):
""" """
Construct a schema for a specific image type, allowing transparency options Construct a schema for a specific image type, allowing transparency options
@ -523,10 +596,33 @@ def typed_image_schema(image_type):
# The config schema can be a (possibly empty) single list of images, # The config schema can be a (possibly empty) single list of images,
# or a dictionary of image types each with a list of images # or a dictionary of image types each with a list of images
CONFIG_SCHEMA = cv.Any( # or a dictionary with keys `defaults:` and `images:`
cv.Schema({cv.Optional(t.lower()): typed_image_schema(t) for t in IMAGE_TYPE}),
cv.ensure_list(IMAGE_SCHEMA),
) def _config_schema(config):
if isinstance(config, list):
return cv.Schema([IMAGE_SCHEMA])(config)
if not isinstance(config, dict):
raise cv.Invalid(
"Badly formed image configuration, expected a list or a dictionary"
)
if CONF_DEFAULTS in config or CONF_IMAGES in config:
return validate_defaults(
cv.Schema(
{
cv.Required(CONF_DEFAULTS): OPTIONS_SCHEMA,
cv.Required(CONF_IMAGES): cv.ensure_list(IMAGE_SCHEMA_NO_DEFAULTS),
}
)(config)
)
if CONF_ID in config or CONF_FILE in config:
return cv.ensure_list(IMAGE_SCHEMA)([config])
return cv.Schema(
{cv.Optional(t.lower()): typed_image_schema(t) for t in IMAGE_TYPE}
)(config)
CONFIG_SCHEMA = _config_schema
async def write_image(config, all_frames=False): async def write_image(config, all_frames=False):
@ -585,6 +681,9 @@ async def write_image(config, all_frames=False):
total_rows = height * frame_count total_rows = height * frame_count
encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha) encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha)
if byte_order := config.get(CONF_BYTE_ORDER):
# Check for valid type has already been done in validate_settings
encoder.set_big_endian(byte_order == "BIG_ENDIAN")
for frame_index in range(frame_count): for frame_index in range(frame_count):
image.seek(frame_index) image.seek(frame_index)
pixels = encoder.convert(image.resize((width, height)), path).getdata() pixels = encoder.convert(image.resize((width, height)), path).getdata()

View File

@ -3132,7 +3132,7 @@ void HOT GDEY0583T81::display() {
} else { } else {
// Partial out (PTOUT), makes the display exit partial mode // Partial out (PTOUT), makes the display exit partial mode
this->command(0x92); this->command(0x92);
ESP_LOGD(TAG, "Partial update done, next full update after %d cycles", ESP_LOGD(TAG, "Partial update done, next full update after %" PRIu32 " cycles",
this->full_update_every_ - this->at_update_ - 1); this->full_update_every_ - this->at_update_ - 1);
} }

View File

@ -258,53 +258,60 @@ std::string format_hex(const uint8_t *data, size_t length) {
std::string format_hex(const std::vector<uint8_t> &data) { return format_hex(data.data(), data.size()); } std::string format_hex(const std::vector<uint8_t> &data) { return format_hex(data.data(), data.size()); }
static char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; } static char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; }
std::string format_hex_pretty(const uint8_t *data, size_t length) { std::string format_hex_pretty(const uint8_t *data, size_t length, char separator, bool show_length) {
if (length == 0) if (data == nullptr || length == 0)
return ""; return "";
std::string ret; std::string ret;
ret.resize(3 * length - 1); uint8_t multiple = separator ? 3 : 2; // 3 if separator is not \0, 2 otherwise
ret.resize(multiple * length - 1);
for (size_t i = 0; i < length; i++) { for (size_t i = 0; i < length; i++) {
ret[3 * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4); ret[multiple * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4);
ret[3 * i + 1] = format_hex_pretty_char(data[i] & 0x0F); ret[multiple * i + 1] = format_hex_pretty_char(data[i] & 0x0F);
if (i != length - 1) if (separator && i != length - 1)
ret[3 * i + 2] = '.'; ret[multiple * i + 2] = separator;
} }
if (length > 4) if (show_length && length > 4)
return ret + " (" + to_string(length) + ")"; return ret + " (" + std::to_string(length) + ")";
return ret; return ret;
} }
std::string format_hex_pretty(const std::vector<uint8_t> &data) { return format_hex_pretty(data.data(), data.size()); } std::string format_hex_pretty(const std::vector<uint8_t> &data, char separator, bool show_length) {
return format_hex_pretty(data.data(), data.size(), separator, show_length);
}
std::string format_hex_pretty(const uint16_t *data, size_t length) { std::string format_hex_pretty(const uint16_t *data, size_t length, char separator, bool show_length) {
if (length == 0) if (data == nullptr || length == 0)
return ""; return "";
std::string ret; std::string ret;
ret.resize(5 * length - 1); uint8_t multiple = separator ? 5 : 4; // 5 if separator is not \0, 4 otherwise
ret.resize(multiple * length - 1);
for (size_t i = 0; i < length; i++) { for (size_t i = 0; i < length; i++) {
ret[5 * i] = format_hex_pretty_char((data[i] & 0xF000) >> 12); ret[multiple * i] = format_hex_pretty_char((data[i] & 0xF000) >> 12);
ret[5 * i + 1] = format_hex_pretty_char((data[i] & 0x0F00) >> 8); ret[multiple * i + 1] = format_hex_pretty_char((data[i] & 0x0F00) >> 8);
ret[5 * i + 2] = format_hex_pretty_char((data[i] & 0x00F0) >> 4); ret[multiple * i + 2] = format_hex_pretty_char((data[i] & 0x00F0) >> 4);
ret[5 * i + 3] = format_hex_pretty_char(data[i] & 0x000F); ret[multiple * i + 3] = format_hex_pretty_char(data[i] & 0x000F);
if (i != length - 1) if (separator && i != length - 1)
ret[5 * i + 2] = '.'; ret[multiple * i + 4] = separator;
} }
if (length > 4) if (show_length && length > 4)
return ret + " (" + to_string(length) + ")"; return ret + " (" + std::to_string(length) + ")";
return ret; return ret;
} }
std::string format_hex_pretty(const std::vector<uint16_t> &data) { return format_hex_pretty(data.data(), data.size()); } std::string format_hex_pretty(const std::vector<uint16_t> &data, char separator, bool show_length) {
std::string format_hex_pretty(const std::string &data) { return format_hex_pretty(data.data(), data.size(), separator, show_length);
}
std::string format_hex_pretty(const std::string &data, char separator, bool show_length) {
if (data.empty()) if (data.empty())
return ""; return "";
std::string ret; std::string ret;
ret.resize(3 * data.length() - 1); uint8_t multiple = separator ? 3 : 2; // 3 if separator is not \0, 2 otherwise
ret.resize(multiple * data.length() - 1);
for (size_t i = 0; i < data.length(); i++) { for (size_t i = 0; i < data.length(); i++) {
ret[3 * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4); ret[multiple * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4);
ret[3 * i + 1] = format_hex_pretty_char(data[i] & 0x0F); ret[multiple * i + 1] = format_hex_pretty_char(data[i] & 0x0F);
if (i != data.length() - 1) if (separator && i != data.length() - 1)
ret[3 * i + 2] = '.'; ret[multiple * i + 2] = separator;
} }
if (data.length() > 4) if (show_length && data.length() > 4)
return ret + " (" + std::to_string(data.length()) + ")"; return ret + " (" + std::to_string(data.length()) + ")";
return ret; return ret;
} }

View File

@ -344,20 +344,149 @@ template<std::size_t N> std::string format_hex(const std::array<uint8_t, N> &dat
return format_hex(data.data(), data.size()); return format_hex(data.data(), data.size());
} }
/// Format the byte array \p data of length \p len in pretty-printed, human-readable hex. /** Format a byte array in pretty-printed, human-readable hex format.
std::string format_hex_pretty(const uint8_t *data, size_t length); *
/// Format the word array \p data of length \p len in pretty-printed, human-readable hex. * Converts binary data to a hexadecimal string representation with customizable formatting.
std::string format_hex_pretty(const uint16_t *data, size_t length); * Each byte is displayed as a two-digit uppercase hex value, separated by the specified separator.
/// Format the vector \p data in pretty-printed, human-readable hex. * Optionally includes the total byte count in parentheses at the end.
std::string format_hex_pretty(const std::vector<uint8_t> &data); *
/// Format the vector \p data in pretty-printed, human-readable hex. * @param data Pointer to the byte array to format.
std::string format_hex_pretty(const std::vector<uint16_t> &data); * @param length Number of bytes in the array.
/// Format the string \p data in pretty-printed, human-readable hex. * @param separator Character to use between hex bytes (default: '.').
std::string format_hex_pretty(const std::string &data); * @param show_length Whether to append the byte count in parentheses (default: true).
/// Format an unsigned integer in pretty-printed, human-readable hex, starting with the most significant byte. * @return Formatted hex string, e.g., "A1.B2.C3.D4.E5 (5)" or "A1:B2:C3" depending on parameters.
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0> std::string format_hex_pretty(T val) { *
* @note Returns empty string if data is nullptr or length is 0.
* @note The length will only be appended if show_length is true AND the length is greater than 4.
*
* Example:
* @code
* uint8_t data[] = {0xA1, 0xB2, 0xC3};
* format_hex_pretty(data, 3); // Returns "A1.B2.C3" (no length shown for <= 4 parts)
* uint8_t data2[] = {0xA1, 0xB2, 0xC3, 0xD4, 0xE5};
* format_hex_pretty(data2, 5); // Returns "A1.B2.C3.D4.E5 (5)"
* format_hex_pretty(data2, 5, ':'); // Returns "A1:B2:C3:D4:E5 (5)"
* format_hex_pretty(data2, 5, '.', false); // Returns "A1.B2.C3.D4.E5"
* @endcode
*/
std::string format_hex_pretty(const uint8_t *data, size_t length, char separator = '.', bool show_length = true);
/** Format a 16-bit word array in pretty-printed, human-readable hex format.
*
* Similar to the byte array version, but formats 16-bit words as 4-digit hex values.
*
* @param data Pointer to the 16-bit word array to format.
* @param length Number of 16-bit words in the array.
* @param separator Character to use between hex words (default: '.').
* @param show_length Whether to append the word count in parentheses (default: true).
* @return Formatted hex string with 4-digit hex values per word.
*
* @note The length will only be appended if show_length is true AND the length is greater than 4.
*
* Example:
* @code
* uint16_t data[] = {0xA1B2, 0xC3D4};
* format_hex_pretty(data, 2); // Returns "A1B2.C3D4" (no length shown for <= 4 parts)
* uint16_t data2[] = {0xA1B2, 0xC3D4, 0xE5F6};
* format_hex_pretty(data2, 3); // Returns "A1B2.C3D4.E5F6 (3)"
* @endcode
*/
std::string format_hex_pretty(const uint16_t *data, size_t length, char separator = '.', bool show_length = true);
/** Format a byte vector in pretty-printed, human-readable hex format.
*
* Convenience overload for std::vector<uint8_t>. Formats each byte as a two-digit
* uppercase hex value with customizable separator.
*
* @param data Vector of bytes to format.
* @param separator Character to use between hex bytes (default: '.').
* @param show_length Whether to append the byte count in parentheses (default: true).
* @return Formatted hex string representation of the vector contents.
*
* @note The length will only be appended if show_length is true AND the vector size is greater than 4.
*
* Example:
* @code
* std::vector<uint8_t> data = {0xDE, 0xAD, 0xBE, 0xEF};
* format_hex_pretty(data); // Returns "DE.AD.BE.EF" (no length shown for <= 4 parts)
* std::vector<uint8_t> data2 = {0xDE, 0xAD, 0xBE, 0xEF, 0xCA};
* format_hex_pretty(data2); // Returns "DE.AD.BE.EF.CA (5)"
* format_hex_pretty(data2, '-'); // Returns "DE-AD-BE-EF-CA (5)"
* @endcode
*/
std::string format_hex_pretty(const std::vector<uint8_t> &data, char separator = '.', bool show_length = true);
/** Format a 16-bit word vector in pretty-printed, human-readable hex format.
*
* Convenience overload for std::vector<uint16_t>. Each 16-bit word is formatted
* as a 4-digit uppercase hex value in big-endian order.
*
* @param data Vector of 16-bit words to format.
* @param separator Character to use between hex words (default: '.').
* @param show_length Whether to append the word count in parentheses (default: true).
* @return Formatted hex string representation of the vector contents.
*
* @note The length will only be appended if show_length is true AND the vector size is greater than 4.
*
* Example:
* @code
* std::vector<uint16_t> data = {0x1234, 0x5678};
* format_hex_pretty(data); // Returns "1234.5678" (no length shown for <= 4 parts)
* std::vector<uint16_t> data2 = {0x1234, 0x5678, 0x9ABC};
* format_hex_pretty(data2); // Returns "1234.5678.9ABC (3)"
* @endcode
*/
std::string format_hex_pretty(const std::vector<uint16_t> &data, char separator = '.', bool show_length = true);
/** Format a string's bytes in pretty-printed, human-readable hex format.
*
* Treats each character in the string as a byte and formats it in hex.
* Useful for debugging binary data stored in std::string containers.
*
* @param data String whose bytes should be formatted as hex.
* @param separator Character to use between hex bytes (default: '.').
* @param show_length Whether to append the byte count in parentheses (default: true).
* @return Formatted hex string representation of the string's byte contents.
*
* @note The length will only be appended if show_length is true AND the string length is greater than 4.
*
* Example:
* @code
* std::string data = "ABC"; // ASCII: 0x41, 0x42, 0x43
* format_hex_pretty(data); // Returns "41.42.43" (no length shown for <= 4 parts)
* std::string data2 = "ABCDE";
* format_hex_pretty(data2); // Returns "41.42.43.44.45 (5)"
* @endcode
*/
std::string format_hex_pretty(const std::string &data, char separator = '.', bool show_length = true);
/** Format an unsigned integer in pretty-printed, human-readable hex format.
*
* Converts the integer to big-endian byte order and formats each byte as hex.
* The most significant byte appears first in the output string.
*
* @tparam T Unsigned integer type (uint8_t, uint16_t, uint32_t, uint64_t, etc.).
* @param val The unsigned integer value to format.
* @param separator Character to use between hex bytes (default: '.').
* @param show_length Whether to append the byte count in parentheses (default: true).
* @return Formatted hex string with most significant byte first.
*
* @note The length will only be appended if show_length is true AND sizeof(T) is greater than 4.
*
* Example:
* @code
* uint32_t value = 0x12345678;
* format_hex_pretty(value); // Returns "12.34.56.78" (no length shown for <= 4 parts)
* uint64_t value2 = 0x123456789ABCDEF0;
* format_hex_pretty(value2); // Returns "12.34.56.78.9A.BC.DE.F0 (8)"
* format_hex_pretty(value2, ':'); // Returns "12:34:56:78:9A:BC:DE:F0 (8)"
* format_hex_pretty<uint16_t>(0x1234); // Returns "12.34"
* @endcode
*/
template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0>
std::string format_hex_pretty(T val, char separator = '.', bool show_length = true) {
val = convert_big_endian(val); val = convert_big_endian(val);
return format_hex_pretty(reinterpret_cast<uint8_t *>(&val), sizeof(T)); return format_hex_pretty(reinterpret_cast<uint8_t *>(&val), sizeof(T), separator, show_length);
} }
/// Format the byte array \p data of length \p len in binary. /// Format the byte array \p data of length \p len in binary.

View File

@ -1,29 +1,71 @@
"""Fixtures for component tests.""" """Fixtures for component tests."""
from __future__ import annotations
from collections.abc import Callable, Generator
from pathlib import Path from pathlib import Path
import sys import sys
import pytest
# Add package root to python path # Add package root to python path
here = Path(__file__).parent here = Path(__file__).parent
package_root = here.parent.parent package_root = here.parent.parent
sys.path.insert(0, package_root.as_posix()) sys.path.insert(0, package_root.as_posix())
import pytest # noqa: E402
from esphome.__main__ import generate_cpp_contents # noqa: E402 from esphome.__main__ import generate_cpp_contents # noqa: E402
from esphome.config import read_config # noqa: E402 from esphome.config import read_config # noqa: E402
from esphome.core import CORE # noqa: E402 from esphome.core import CORE # noqa: E402
@pytest.fixture(autouse=True)
def config_path(request: pytest.FixtureRequest) -> Generator[None]:
"""Set CORE.config_path to the component's config directory and reset it after the test."""
original_path = CORE.config_path
config_dir = Path(request.fspath).parent / "config"
# Check if config directory exists, if not use parent directory
if config_dir.exists():
# Set config_path to a dummy yaml file in the config directory
# This ensures CORE.config_dir points to the config directory
CORE.config_path = str(config_dir / "dummy.yaml")
else:
CORE.config_path = str(Path(request.fspath).parent / "dummy.yaml")
yield
CORE.config_path = original_path
@pytest.fixture @pytest.fixture
def generate_main(): def component_fixture_path(request: pytest.FixtureRequest) -> Callable[[str], Path]:
"""Return a function to get absolute paths relative to the component's fixtures directory."""
def _get_path(file_name: str) -> Path:
"""Get the absolute path of a file relative to the component's fixtures directory."""
return (Path(request.fspath).parent / "fixtures" / file_name).absolute()
return _get_path
@pytest.fixture
def component_config_path(request: pytest.FixtureRequest) -> Callable[[str], Path]:
"""Return a function to get absolute paths relative to the component's config directory."""
def _get_path(file_name: str) -> Path:
"""Get the absolute path of a file relative to the component's config directory."""
return (Path(request.fspath).parent / "config" / file_name).absolute()
return _get_path
@pytest.fixture
def generate_main() -> Generator[Callable[[str | Path], str]]:
"""Generates the C++ main.cpp file and returns it in string form.""" """Generates the C++ main.cpp file and returns it in string form."""
def generator(path: str) -> str: def generator(path: str | Path) -> str:
CORE.config_path = path CORE.config_path = str(path)
CORE.config = read_config({}) CORE.config = read_config({})
generate_cpp_contents(CORE.config) generate_cpp_contents(CORE.config)
print(CORE.cpp_main_section)
return CORE.cpp_main_section return CORE.cpp_main_section
yield generator yield generator

Binary file not shown.

After

Width:  |  Height:  |  Size: 685 B

View File

@ -0,0 +1,20 @@
esphome:
name: test
esp32:
board: esp32s3box
image:
- file: image.png
byte_order: little_endian
id: cat_img
type: rgb565
spi:
mosi_pin: 6
clk_pin: 7
display:
- platform: mipi_spi
id: lcd_display
model: s3box

View File

@ -0,0 +1,183 @@
"""Tests for image configuration validation."""
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
from typing import Any
import pytest
from esphome import config_validation as cv
from esphome.components.image import CONFIG_SCHEMA
@pytest.mark.parametrize(
("config", "error_match"),
[
pytest.param(
"a string",
"Badly formed image configuration, expected a list or a dictionary",
id="invalid_string_config",
),
pytest.param(
{"id": "image_id", "type": "rgb565"},
r"required key not provided @ data\[0\]\['file'\]",
id="missing_file",
),
pytest.param(
{"file": "image.png", "type": "rgb565"},
r"required key not provided @ data\[0\]\['id'\]",
id="missing_id",
),
pytest.param(
{"id": "mdi_id", "file": "mdi:weather-##", "type": "rgb565"},
"Could not parse mdi icon name",
id="invalid_mdi_icon",
),
pytest.param(
{
"id": "image_id",
"file": "image.png",
"type": "binary",
"transparency": "alpha_channel",
},
"Image format 'BINARY' cannot have transparency",
id="binary_with_transparency",
),
pytest.param(
{
"id": "image_id",
"file": "image.png",
"type": "rgb565",
"transparency": "chroma_key",
"invert_alpha": True,
},
"No alpha channel to invert",
id="invert_alpha_without_alpha_channel",
),
pytest.param(
{
"id": "image_id",
"file": "image.png",
"type": "binary",
"byte_order": "big_endian",
},
"Image format 'BINARY' does not support byte order configuration",
id="binary_with_byte_order",
),
pytest.param(
{"id": "image_id", "file": "bad.png", "type": "binary"},
"File can't be opened as image",
id="invalid_image_file",
),
pytest.param(
{"defaults": {}, "images": [{"id": "image_id", "file": "image.png"}]},
"Type is required either in the image config or in the defaults",
id="missing_type_in_defaults",
),
],
)
def test_image_configuration_errors(
config: Any,
error_match: str,
) -> None:
"""Test detection of invalid configuration."""
with pytest.raises(cv.Invalid, match=error_match):
CONFIG_SCHEMA(config)
@pytest.mark.parametrize(
"config",
[
pytest.param(
{
"id": "image_id",
"file": "image.png",
"type": "rgb565",
"transparency": "chroma_key",
"byte_order": "little_endian",
"dither": "FloydSteinberg",
"resize": "100x100",
"invert_alpha": False,
},
id="single_image_all_options",
),
pytest.param(
[
{
"id": "image_id",
"file": "image.png",
"type": "binary",
}
],
id="list_of_images",
),
pytest.param(
{
"defaults": {
"type": "rgb565",
"transparency": "chroma_key",
"byte_order": "little_endian",
"dither": "FloydSteinberg",
"resize": "100x100",
"invert_alpha": False,
},
"images": [
{
"id": "image_id",
"file": "image.png",
}
],
},
id="images_with_defaults",
),
pytest.param(
{
"rgb565": {
"alpha_channel": [
{
"id": "image_id",
"file": "image.png",
"transparency": "alpha_channel",
"byte_order": "little_endian",
"dither": "FloydSteinberg",
"resize": "100x100",
"invert_alpha": False,
}
]
},
"binary": [
{
"id": "image_id",
"file": "image.png",
"transparency": "opaque",
"dither": "FloydSteinberg",
"resize": "100x100",
"invert_alpha": False,
}
],
},
id="type_based_organization",
),
],
)
def test_image_configuration_success(
config: dict[str, Any] | list[dict[str, Any]],
) -> None:
"""Test successful configuration validation."""
CONFIG_SCHEMA(config)
def test_image_generation(
generate_main: Callable[[str | Path], str],
component_config_path: Callable[[str], Path],
) -> None:
"""Test image generation configuration."""
main_cpp = generate_main(component_config_path("image_test.yaml"))
assert "uint8_t_id[] PROGMEM = {0x24, 0x21, 0x24, 0x21" in main_cpp
assert (
"cat_img = new image::Image(uint8_t_id, 32, 24, image::IMAGE_TYPE_RGB565, image::TRANSPARENCY_OPAQUE);"
in main_cpp
)

View File

@ -1,17 +0,0 @@
spi:
- id: spi_main_lcd
clk_pin: 16
mosi_pin: 17
miso_pin: 32
display:
- platform: ili9xxx
id: main_lcd
model: ili9342
cs_pin: 14
dc_pin: 13
reset_pin: 21
invert_colors: true
<<: !include common.yaml

View File

@ -1,16 +0,0 @@
spi:
- id: spi_main_lcd
clk_pin: 6
mosi_pin: 7
miso_pin: 5
display:
- platform: ili9xxx
id: main_lcd
model: ili9342
cs_pin: 3
dc_pin: 11
reset_pin: 10
invert_colors: true
<<: !include common.yaml

View File

@ -1,16 +0,0 @@
spi:
- id: spi_main_lcd
clk_pin: 6
mosi_pin: 7
miso_pin: 5
display:
- platform: ili9xxx
id: main_lcd
model: ili9342
cs_pin: 3
dc_pin: 11
reset_pin: 10
invert_colors: true
<<: !include common.yaml

View File

@ -13,4 +13,13 @@ display:
reset_pin: 16 reset_pin: 16
invert_colors: true invert_colors: true
<<: !include common.yaml image:
defaults:
type: rgb565
transparency: opaque
byte_order: little_endian
resize: 50x50
dither: FloydSteinberg
images:
- id: test_image
file: ../../pnglogo.png

View File

@ -78,3 +78,268 @@ pytest -s tests/integration/test_host_mode_basic.py
- Each test gets its own temporary directory and unique port - Each test gets its own temporary directory and unique port
- Port allocation minimizes race conditions by holding the socket until just before ESPHome starts - Port allocation minimizes race conditions by holding the socket until just before ESPHome starts
- Output from ESPHome processes is displayed for debugging - Output from ESPHome processes is displayed for debugging
## Integration Test Writing Guide
### Test Patterns and Best Practices
#### 1. Test File Naming Convention
- Use descriptive names: `test_{category}_{feature}.py`
- Common categories: `host_mode`, `api`, `scheduler`, `light`, `areas_and_devices`
- Examples:
- `test_host_mode_basic.py` - Basic host mode functionality
- `test_api_message_batching.py` - API message batching
- `test_scheduler_stress.py` - Scheduler stress testing
#### 2. Essential Imports
```python
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from aioesphomeapi import EntityState, SensorState
from .types import APIClientConnectedFactory, RunCompiledFunction
```
#### 3. Common Test Patterns
##### Basic Entity Test
```python
@pytest.mark.asyncio
async def test_my_sensor(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test sensor functionality."""
async with run_compiled(yaml_config), api_client_connected() as client:
# Get entity list
entities, services = await client.list_entities_services()
# Find specific entity
sensor = next((e for e in entities if e.object_id == "my_sensor"), None)
assert sensor is not None
```
##### State Subscription Pattern
```python
# Track state changes with futures
loop = asyncio.get_running_loop()
states: dict[int, EntityState] = {}
state_future: asyncio.Future[EntityState] = loop.create_future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# Check for specific condition using isinstance
if isinstance(state, SensorState) and state.state == expected_value:
if not state_future.done():
state_future.set_result(state)
client.subscribe_states(on_state)
# Wait for state with timeout
try:
result = await asyncio.wait_for(state_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(f"Expected state not received. Got: {list(states.values())}")
```
##### Service Execution Pattern
```python
# Find and execute service
entities, services = await client.list_entities_services()
my_service = next((s for s in services if s.name == "my_service"), None)
assert my_service is not None
# Execute with parameters
client.execute_service(my_service, {"param1": "value1", "param2": 42})
```
##### Multiple Entity Tracking
```python
# For tests with many entities
loop = asyncio.get_running_loop()
entity_count = 50
received_states: set[int] = set()
all_states_future: asyncio.Future[bool] = loop.create_future()
def on_state(state: EntityState) -> None:
received_states.add(state.key)
if len(received_states) >= entity_count and not all_states_future.done():
all_states_future.set_result(True)
client.subscribe_states(on_state)
await asyncio.wait_for(all_states_future, timeout=10.0)
```
#### 4. YAML Fixture Guidelines
##### Naming Convention
- Match test function name: `test_my_feature``fixtures/my_feature.yaml`
- Note: Remove `test_` prefix for fixture filename
##### Basic Structure
```yaml
esphome:
name: test-name # Use kebab-case
# Optional: areas, devices, platformio_options
host: # Always use host platform for integration tests
api: # Port injected automatically
logger:
level: DEBUG # Optional: Set log level
# Component configurations
sensor:
- platform: template
name: "My Sensor"
id: my_sensor
lambda: return 42.0;
update_interval: 0.1s # Fast updates for testing
```
##### Advanced Features
```yaml
# External components for custom test code
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH # Replaced by test framework
components: [my_test_component]
# Areas and devices
esphome:
name: test-device
areas:
- id: living_room
name: "Living Room"
- id: kitchen
name: "Kitchen"
parent_id: living_room
devices:
- id: my_device
name: "Test Device"
area_id: living_room
# API services
api:
services:
- service: test_service
variables:
my_param: string
then:
- logger.log:
format: "Service called with: %s"
args: [my_param.c_str()]
```
#### 5. Testing Complex Scenarios
##### External Components
Create C++ components in `fixtures/external_components/` for:
- Stress testing
- Custom entity behaviors
- Scheduler testing
- Memory management tests
##### Log Line Monitoring
```python
log_lines: list[str] = []
def on_log_line(line: str) -> None:
log_lines.append(line)
if "expected message" in line:
# Handle specific log messages
async with run_compiled(yaml_config, line_callback=on_log_line):
# Test implementation
```
Example using futures for specific log patterns:
```python
import re
loop = asyncio.get_running_loop()
connected_future = loop.create_future()
service_future = loop.create_future()
# Patterns to match
connected_pattern = re.compile(r"Client .* connected from")
service_pattern = re.compile(r"Service called")
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not connected_future.done() and connected_pattern.search(line):
connected_future.set_result(True)
elif not service_future.done() and service_pattern.search(line):
service_future.set_result(True)
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client:
# Wait for specific log message
await asyncio.wait_for(connected_future, timeout=5.0)
# Do test actions...
# Wait for service log
await asyncio.wait_for(service_future, timeout=5.0)
```
**Note**: Tests that monitor log messages typically have fewer race conditions compared to state-based testing, making them more reliable. However, be aware that the host platform currently does not have a thread-safe logger, so logging from threads will not work correctly.
##### Timeout Handling
```python
# Always use timeouts for async operations
try:
result = await asyncio.wait_for(some_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail("Operation timed out - check test expectations")
```
#### 6. Common Assertions
```python
# Device info
assert device_info.name == "expected-name"
assert device_info.compilation_time is not None
# Entity properties
assert sensor.accuracy_decimals == 2
assert sensor.state_class == 1 # measurement
assert sensor.force_update is True
# Service availability
assert len(services) > 0
assert any(s.name == "expected_service" for s in services)
# State values
assert state.state == expected_value
assert state.missing_state is False
```
#### 7. Debugging Tips
- Use `pytest -s` to see ESPHome output during tests
- Add descriptive failure messages to assertions
- Use `pytest.fail()` with detailed error info for timeouts
- Check `log_lines` for compilation or runtime errors
- Enable debug logging in YAML fixtures when needed
#### 8. Performance Considerations
- Use short update intervals (0.1s) for faster tests
- Set reasonable timeouts (5-10s for most operations)
- Batch multiple assertions when possible
- Clean up resources properly using context managers
#### 9. Test Categories
- **Basic Tests**: Minimal functionality verification
- **Entity Tests**: Sensor, switch, light behavior
- **API Tests**: Message batching, services, events
- **Scheduler Tests**: Timing, defer operations, stress
- **Memory Tests**: Conditional compilation, optimization
- **Integration Tests**: Areas, devices, complex interactions

View File

@ -165,6 +165,19 @@ async def compile_esphome(
"""Compile an ESPHome configuration and return the binary path.""" """Compile an ESPHome configuration and return the binary path."""
async def _compile(config_path: Path) -> Path: async def _compile(config_path: Path) -> Path:
# Create a unique PlatformIO directory for this test to avoid race conditions
platformio_dir = integration_test_dir / ".platformio"
platformio_dir.mkdir(parents=True, exist_ok=True)
# Create cache directory as well
platformio_cache_dir = platformio_dir / ".cache"
platformio_cache_dir.mkdir(parents=True, exist_ok=True)
# Set up environment with isolated PlatformIO directories
env = os.environ.copy()
env["PLATFORMIO_CORE_DIR"] = str(platformio_dir)
env["PLATFORMIO_CACHE_DIR"] = str(platformio_cache_dir)
# Retry compilation up to 3 times if we get a segfault # Retry compilation up to 3 times if we get a segfault
max_retries = 3 max_retries = 3
for attempt in range(max_retries): for attempt in range(max_retries):
@ -179,6 +192,7 @@ async def compile_esphome(
stdin=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling # Start in a new process group to isolate signal handling
start_new_session=True, start_new_session=True,
env=env,
) )
await proc.wait() await proc.wait()

View File

@ -2,14 +2,10 @@ esphome:
name: api-conditional-memory-test name: api-conditional-memory-test
host: host:
api: api:
batch_delay: 0ms
actions: actions:
- action: test_simple_service - action: test_simple_service
then: then:
- logger.log: "Simple service called" - logger.log: "Simple service called"
- binary_sensor.template.publish:
id: service_called_sensor
state: ON
- action: test_service_with_args - action: test_service_with_args
variables: variables:
arg_string: string arg_string: string
@ -20,53 +16,14 @@ api:
- logger.log: - logger.log:
format: "Service called with: %s, %d, %d, %.2f" format: "Service called with: %s, %d, %d, %.2f"
args: [arg_string.c_str(), arg_int, arg_bool, arg_float] args: [arg_string.c_str(), arg_int, arg_bool, arg_float]
- sensor.template.publish:
id: service_arg_sensor
state: !lambda 'return arg_float;'
on_client_connected: on_client_connected:
- logger.log: - logger.log:
format: "Client %s connected from %s" format: "Client %s connected from %s"
args: [client_info.c_str(), client_address.c_str()] args: [client_info.c_str(), client_address.c_str()]
- binary_sensor.template.publish:
id: client_connected
state: ON
- text_sensor.template.publish:
id: last_client_info
state: !lambda 'return client_info;'
on_client_disconnected: on_client_disconnected:
- logger.log: - logger.log:
format: "Client %s disconnected from %s" format: "Client %s disconnected from %s"
args: [client_info.c_str(), client_address.c_str()] args: [client_info.c_str(), client_address.c_str()]
- binary_sensor.template.publish:
id: client_connected
state: OFF
- binary_sensor.template.publish:
id: client_disconnected_event
state: ON
logger: logger:
level: DEBUG level: DEBUG
binary_sensor:
- platform: template
name: "Client Connected"
id: client_connected
device_class: connectivity
- platform: template
name: "Client Disconnected Event"
id: client_disconnected_event
- platform: template
name: "Service Called"
id: service_called_sensor
sensor:
- platform: template
name: "Service Argument Value"
id: service_arg_sensor
unit_of_measurement: ""
accuracy_decimals: 2
text_sensor:
- platform: template
name: "Last Client Info"
id: last_client_info

View File

@ -26,7 +26,6 @@ void SchedulerStringLifetimeComponent::run_string_lifetime_test() {
// Schedule final check // Schedule final check
this->set_timeout("final_check", 200, [this]() { this->set_timeout("final_check", 200, [this]() {
ESP_LOGI(TAG, "String lifetime tests complete");
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_); ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_); ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
@ -35,6 +34,7 @@ void SchedulerStringLifetimeComponent::run_string_lifetime_test() {
} else { } else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_); ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
} }
ESP_LOGI(TAG, "String lifetime tests complete");
}); });
} }

View File

@ -3,15 +3,9 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import re
from aioesphomeapi import ( from aioesphomeapi import UserService, UserServiceArgType
BinarySensorInfo,
EntityState,
SensorInfo,
TextSensorInfo,
UserService,
UserServiceArgType,
)
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -25,50 +19,45 @@ async def test_api_conditional_memory(
) -> None: ) -> None:
"""Test API triggers and services work correctly with conditional compilation.""" """Test API triggers and services work correctly with conditional compilation."""
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Keep ESPHome process running throughout the test
async with run_compiled(yaml_config): # Track log messages
# First connection connected_future = loop.create_future()
disconnected_future = loop.create_future()
service_simple_future = loop.create_future()
service_args_future = loop.create_future()
# Patterns to match in logs
connected_pattern = re.compile(r"Client .* connected from")
disconnected_pattern = re.compile(r"Client .* disconnected from")
service_simple_pattern = re.compile(r"Simple service called")
service_args_pattern = re.compile(
r"Service called with: test_string, 123, 1, 42\.50"
)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not connected_future.done() and connected_pattern.search(line):
connected_future.set_result(True)
elif not disconnected_future.done() and disconnected_pattern.search(line):
disconnected_future.set_result(True)
elif not service_simple_future.done() and service_simple_pattern.search(line):
service_simple_future.set_result(True)
elif not service_args_future.done() and service_args_pattern.search(line):
service_args_future.set_result(True)
# Run with log monitoring
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client: async with api_client_connected() as client:
# Verify device info # Verify device info
device_info = await client.device_info() device_info = await client.device_info()
assert device_info is not None assert device_info is not None
assert device_info.name == "api-conditional-memory-test" assert device_info.name == "api-conditional-memory-test"
# List entities and services # Wait for connection log
entity_info, services = await asyncio.wait_for( await asyncio.wait_for(connected_future, timeout=5.0)
client.list_entities_services(), timeout=5.0
)
# Find our entities # List services
client_connected: BinarySensorInfo | None = None _, services = await client.list_entities_services()
client_disconnected_event: BinarySensorInfo | None = None
service_called_sensor: BinarySensorInfo | None = None
service_arg_sensor: SensorInfo | None = None
last_client_info: TextSensorInfo | None = None
for entity in entity_info:
if isinstance(entity, BinarySensorInfo):
if entity.object_id == "client_connected":
client_connected = entity
elif entity.object_id == "client_disconnected_event":
client_disconnected_event = entity
elif entity.object_id == "service_called":
service_called_sensor = entity
elif isinstance(entity, SensorInfo):
if entity.object_id == "service_argument_value":
service_arg_sensor = entity
elif isinstance(entity, TextSensorInfo):
if entity.object_id == "last_client_info":
last_client_info = entity
# Verify all entities exist
assert client_connected is not None, "client_connected sensor not found"
assert client_disconnected_event is not None, (
"client_disconnected_event sensor not found"
)
assert service_called_sensor is not None, "service_called sensor not found"
assert service_arg_sensor is not None, "service_arg_sensor not found"
assert last_client_info is not None, "last_client_info sensor not found"
# Verify services exist # Verify services exist
assert len(services) == 2, f"Expected 2 services, found {len(services)}" assert len(services) == 2, f"Expected 2 services, found {len(services)}"
@ -98,66 +87,11 @@ async def test_api_conditional_memory(
assert arg_types["arg_bool"] == UserServiceArgType.BOOL assert arg_types["arg_bool"] == UserServiceArgType.BOOL
assert arg_types["arg_float"] == UserServiceArgType.FLOAT assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Track state changes
states: dict[int, EntityState] = {}
states_future: asyncio.Future[None] = loop.create_future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# Check if we have initial states for connection sensors
if (
client_connected.key in states
and last_client_info.key in states
and not states_future.done()
):
states_future.set_result(None)
client.subscribe_states(on_state)
# Wait for initial states
await asyncio.wait_for(states_future, timeout=5.0)
# Verify on_client_connected trigger fired
connected_state = states.get(client_connected.key)
assert connected_state is not None
assert connected_state.state is True, "Client should be connected"
# Verify client info was captured
client_info_state = states.get(last_client_info.key)
assert client_info_state is not None
assert isinstance(client_info_state.state, str)
assert len(client_info_state.state) > 0, "Client info should not be empty"
# Test simple service
service_future: asyncio.Future[None] = loop.create_future()
def check_service_called(state: EntityState) -> None:
if state.key == service_called_sensor.key and state.state is True:
if not service_future.done():
service_future.set_result(None)
# Update callback to check for service execution
client.subscribe_states(check_service_called)
# Call simple service # Call simple service
client.execute_service(simple_service, {}) client.execute_service(simple_service, {})
# Wait for service to execute # Wait for service log
await asyncio.wait_for(service_future, timeout=5.0) await asyncio.wait_for(service_simple_future, timeout=5.0)
# Test service with arguments
arg_future: asyncio.Future[None] = loop.create_future()
expected_float = 42.5
def check_arg_sensor(state: EntityState) -> None:
if (
state.key == service_arg_sensor.key
and abs(state.state - expected_float) < 0.01
):
if not arg_future.done():
arg_future.set_result(None)
client.subscribe_states(check_arg_sensor)
# Call service with arguments # Call service with arguments
client.execute_service( client.execute_service(
@ -166,43 +100,12 @@ async def test_api_conditional_memory(
"arg_string": "test_string", "arg_string": "test_string",
"arg_int": 123, "arg_int": 123,
"arg_bool": True, "arg_bool": True,
"arg_float": expected_float, "arg_float": 42.5,
}, },
) )
# Wait for service with args to execute # Wait for service with args log
await asyncio.wait_for(arg_future, timeout=5.0) await asyncio.wait_for(service_args_future, timeout=5.0)
# After disconnecting first client, reconnect and verify triggers work # Client disconnected here, wait for disconnect log
async with api_client_connected() as client2: await asyncio.wait_for(disconnected_future, timeout=5.0)
# Subscribe to states with new client
states2: dict[int, EntityState] = {}
states_ready_future: asyncio.Future[None] = loop.create_future()
def on_state2(state: EntityState) -> None:
states2[state.key] = state
# Check if we have received both required states
if (
client_connected.key in states2
and client_disconnected_event.key in states2
and not states_ready_future.done()
):
states_ready_future.set_result(None)
client2.subscribe_states(on_state2)
# Wait for both connected and disconnected event states
await asyncio.wait_for(states_ready_future, timeout=5.0)
# Verify client is connected again (on_client_connected fired)
assert states2[client_connected.key].state is True, (
"Client should be reconnected"
)
# The client_disconnected_event should be ON from when we disconnected
# (it was set ON by on_client_disconnected trigger)
disconnected_state = states2.get(client_disconnected_event.key)
assert disconnected_state is not None
assert disconnected_state.state is True, (
"Disconnect event should be ON from previous disconnect"
)

View File

@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio import asyncio
from typing import Any from typing import Any
from aioesphomeapi import LogLevel from aioesphomeapi import LogLevel, SensorInfo
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -63,7 +63,7 @@ async def test_api_vv_logging(
entity_info, _ = await client.list_entities_services() entity_info, _ = await client.list_entities_services()
# Count sensors # Count sensors
sensor_count = sum(1 for e in entity_info if hasattr(e, "unit_of_measurement")) sensor_count = sum(1 for e in entity_info if isinstance(e, SensorInfo))
assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}" assert sensor_count >= 10, f"Expected at least 10 sensors, got {sensor_count}"
# Wait for sensor updates to flow with VV logging active # Wait for sensor updates to flow with VV logging active

View File

@ -76,8 +76,8 @@ async def test_areas_and_devices(
# Get entity list to verify device_id mapping # Get entity list to verify device_id mapping
entities = await client.list_entities_services() entities = await client.list_entities_services()
# Collect sensor entities # Collect sensor entities (all entities have device_id)
sensor_entities = [e for e in entities[0] if hasattr(e, "device_id")] sensor_entities = entities[0]
assert len(sensor_entities) >= 4, ( assert len(sensor_entities) >= 4, (
f"Expected at least 4 sensor entities, got {len(sensor_entities)}" f"Expected at least 4 sensor entities, got {len(sensor_entities)}"
) )

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
from aioesphomeapi import EntityState from aioesphomeapi import BinarySensorState, EntityState, SensorState, TextSensorState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -40,28 +40,22 @@ async def test_device_id_in_state(
entity_device_mapping: dict[int, int] = {} entity_device_mapping: dict[int, int] = {}
for entity in all_entities: for entity in all_entities:
if hasattr(entity, "name") and hasattr(entity, "key"): # All entities have name and key attributes
if entity.name == "Temperature": if entity.name == "Temperature":
entity_device_mapping[entity.key] = device_ids[ entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
"Temperature Monitor" elif entity.name == "Humidity":
] entity_device_mapping[entity.key] = device_ids["Humidity Monitor"]
elif entity.name == "Humidity": elif entity.name == "Motion Detected":
entity_device_mapping[entity.key] = device_ids["Humidity Monitor"] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "Motion Detected": elif entity.name == "Temperature Monitor Power":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"] entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
elif entity.name == "Temperature Monitor Power": elif entity.name == "Temperature Status":
entity_device_mapping[entity.key] = device_ids[ entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
"Temperature Monitor" elif entity.name == "Motion Light":
] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "Temperature Status": elif entity.name == "No Device Sensor":
entity_device_mapping[entity.key] = device_ids[ # Entity without device_id should have device_id 0
"Temperature Monitor" entity_device_mapping[entity.key] = 0
]
elif entity.name == "Motion Light":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "No Device Sensor":
# Entity without device_id should have device_id 0
entity_device_mapping[entity.key] = 0
assert len(entity_device_mapping) >= 6, ( assert len(entity_device_mapping) >= 6, (
f"Expected at least 6 mapped entities, got {len(entity_device_mapping)}" f"Expected at least 6 mapped entities, got {len(entity_device_mapping)}"
@ -111,7 +105,7 @@ async def test_device_id_in_state(
( (
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") if isinstance(s, SensorState)
and isinstance(s.state, float) and isinstance(s.state, float)
and s.device_id != 0 and s.device_id != 0
), ),
@ -122,11 +116,7 @@ async def test_device_id_in_state(
# Find a binary sensor state # Find a binary sensor state
binary_sensor_state = next( binary_sensor_state = next(
( (s for s in states.values() if isinstance(s, BinarySensorState)),
s
for s in states.values()
if hasattr(s, "state") and isinstance(s.state, bool)
),
None, None,
) )
assert binary_sensor_state is not None, "No binary sensor state found" assert binary_sensor_state is not None, "No binary sensor state found"
@ -136,11 +126,7 @@ async def test_device_id_in_state(
# Find a text sensor state # Find a text sensor state
text_sensor_state = next( text_sensor_state = next(
( (s for s in states.values() if isinstance(s, TextSensorState)),
s
for s in states.values()
if hasattr(s, "state") and isinstance(s.state, str)
),
None, None,
) )
assert text_sensor_state is not None, "No text sensor state found" assert text_sensor_state is not None, "No text sensor state found"

View File

@ -51,9 +51,6 @@ async def test_entity_icon(
entity = entity_map[entity_name] entity = entity_map[entity_name]
# Check icon field # Check icon field
assert hasattr(entity, "icon"), (
f"{entity_name}: Entity should have icon attribute"
)
assert entity.icon == expected_icon, ( assert entity.icon == expected_icon, (
f"{entity_name}: icon mismatch - " f"{entity_name}: icon mismatch - "
f"expected '{expected_icon}', got '{entity.icon}'" f"expected '{expected_icon}', got '{entity.icon}'"
@ -67,9 +64,6 @@ async def test_entity_icon(
entity = entity_map[entity_name] entity = entity_map[entity_name]
# Check icon field is empty # Check icon field is empty
assert hasattr(entity, "icon"), (
f"{entity_name}: Entity should have icon attribute"
)
assert entity.icon == "", ( assert entity.icon == "", (
f"{entity_name}: icon should be empty string for entities without icons, " f"{entity_name}: icon should be empty string for entities without icons, "
f"got '{entity.icon}'" f"got '{entity.icon}'"

View File

@ -25,8 +25,8 @@ async def test_host_mode_entity_fields(
# Create a map of entity names to entity info # Create a map of entity names to entity info
entity_map = {} entity_map = {}
for entity in entities[0]: for entity in entities[0]:
if hasattr(entity, "name"): # All entities should have a name attribute
entity_map[entity.name] = entity entity_map[entity.name] = entity
# Test entities that should be visible via API (non-internal) # Test entities that should be visible via API (non-internal)
visible_test_cases = [ visible_test_cases = [

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
from aioesphomeapi import EntityState from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -30,7 +30,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
# When we have received states from at least 50 sensors, resolve the future # When we have received states from at least 50 sensors, resolve the future
if len(sensor_states) >= 50 and not sensor_count_future.done(): if len(sensor_states) >= 50 and not sensor_count_future.done():
@ -45,7 +45,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
pytest.fail( pytest.fail(
f"Did not receive states from at least 50 sensors within 10 seconds. " f"Did not receive states from at least 50 sensors within 10 seconds. "
@ -61,7 +61,7 @@ async def test_host_mode_many_entities(
sensor_states = [ sensor_states = [
s s
for s in states.values() for s in states.values()
if hasattr(s, "state") and isinstance(s.state, float) if isinstance(s, SensorState) and isinstance(s.state, float)
] ]
assert sensor_count >= 50, ( assert sensor_count >= 50, (

View File

@ -19,16 +19,17 @@ async def test_host_mode_with_sensor(
) -> None: ) -> None:
"""Test Host mode with a sensor component.""" """Test Host mode with a sensor component."""
# Write, compile and run the ESPHome device, then connect to API # Write, compile and run the ESPHome device, then connect to API
loop = asyncio.get_running_loop()
async with run_compiled(yaml_config), api_client_connected() as client: async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to state changes # Subscribe to state changes
states: dict[int, EntityState] = {} states: dict[int, EntityState] = {}
sensor_future: asyncio.Future[EntityState] = asyncio.Future() sensor_future: asyncio.Future[EntityState] = loop.create_future()
def on_state(state: EntityState) -> None: def on_state(state: EntityState) -> None:
states[state.key] = state states[state.key] = state
# If this is our sensor with value 42.0, resolve the future # If this is our sensor with value 42.0, resolve the future
if ( if (
hasattr(state, "state") isinstance(state, aioesphomeapi.SensorState)
and state.state == 42.0 and state.state == 42.0
and not sensor_future.done() and not sensor_future.done()
): ):

View File

@ -7,6 +7,7 @@ including RGB, color temperature, effects, transitions, and flash.
import asyncio import asyncio
from typing import Any from typing import Any
from aioesphomeapi import LightState
import pytest import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@ -76,7 +77,7 @@ async def test_light_calls(
client.light_command(key=rgbcw_light.key, white=0.6) client.light_command(key=rgbcw_light.key, white=0.6)
state = await wait_for_state_change(rgbcw_light.key) state = await wait_for_state_change(rgbcw_light.key)
# White might need more tolerance or might not be directly settable # White might need more tolerance or might not be directly settable
if hasattr(state, "white"): if isinstance(state, LightState) and state.white is not None:
assert state.white == pytest.approx(0.6, abs=0.1) assert state.white == pytest.approx(0.6, abs=0.1)
# Test 8: color_temperature only # Test 8: color_temperature only