Compare commits

..

24 Commits

Author SHA1 Message Date
J. Nick Koston
2560ffb25f Merge branch 'platformio_cache_tests' into platformio_cache_tests_api 2025-10-19 16:37:29 -10:00
J. Nick Koston
c064bb7299 Merge branch 'dev' into platformio_cache_tests 2025-10-19 16:14:16 -10:00
tomaszduda23
c15f1a9be8 [nrf52] add missing defines for tests (#11384)
Co-authored-by: J. Nick Koston <nick@koston.org>
2025-10-19 16:11:44 -10:00
J. Nick Koston
20f336ee6e Merge branch 'platformio_cache_tests' into platformio_cache_tests_api 2025-10-19 16:00:39 -10:00
J. Nick Koston
ebde648305 cleanup 2025-10-19 15:59:33 -10:00
J. Nick Koston
11b53096a6 [ci] Fix fork PR workflow failing to find PRs from forks (#11396) 2025-10-19 15:58:05 -10:00
J. Nick Koston
6a18367949 [cli] Add analyze-memory command (#11395)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-20 14:26:37 +13:00
Javier Peletier
a59b1494d8 [substitutions] Recursive substitutions and better jinja error handling and debug help (#10806) 2025-10-20 14:17:16 +13:00
J. Nick Koston
3182e6caa9 DNM: Test api platformio cache 2025-10-19 15:08:35 -10:00
J. Nick Koston
1106350114 merge 2025-10-19 15:05:42 -10:00
Jesse Hills
e6ce5c58d1 Merge branch 'release' into dev 2025-10-20 13:43:31 +13:00
Jesse Hills
ebc0f5f7c9 Merge pull request #11387 from esphome/bump-2025.10.2
2025.10.2
2025-10-20 13:42:48 +13:00
J. Nick Koston
87ca8784ef [openthread] Backport address resolution support to prevent OTA crash (#11312)
Co-authored-by: Daniel Stiner <danstiner@gmail.com>
2025-10-20 10:12:56 +13:00
Jesse Hills
a186c1062f Bump version to 2025.10.2 2025-10-20 10:06:43 +13:00
Jonathan Swoboda
ea38237f29 [esp32] Fix OTA rollback (#11300)
Co-authored-by: J. Nick Koston <nick@koston.org>
2025-10-20 10:06:43 +13:00
J. Nick Koston
6aff1394ad [core] Fix IndexError when OTA devices cannot be resolved (#11311) 2025-10-20 10:06:43 +13:00
Spectre5
0e34d1b64d Change all temperature offsets to temperature_delta (#11347) 2025-10-20 10:06:43 +13:00
tomaszduda23
1483cee0fb [dashboard] fix migration to Path (#11342)
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
2025-10-20 10:06:43 +13:00
J. Nick Koston
8c1bd2fd85 [dashboard] Fix binary download with packages using secrets after Path migration (#11313) 2025-10-20 10:06:43 +13:00
Daniel Stiner
ea609dc0f6 [const] Add CONF_OPENTHREAD (#11318) 2025-10-20 10:06:42 +13:00
Jonathan Swoboda
913095f6be [esp32] Reduce tx power on Arduino (#11304) 2025-10-20 10:06:42 +13:00
Jonathan Swoboda
bb24ad4a30 [htu21d] Revert register address change (#11291) 2025-10-20 10:06:42 +13:00
Jonathan Swoboda
0d612fecfc [core] Add ESP32 ROM functions to reserved ids (#11293) 2025-10-20 10:06:42 +13:00
J. Nick Koston
9c235b4140 [datetime] Fix DateTimeStateTrigger compilation when time component is not used (#11287) 2025-10-20 10:06:42 +13:00
18 changed files with 511 additions and 1073 deletions

View File

@@ -28,20 +28,23 @@ jobs:
run: |
# Get PR details by searching for PR with matching head SHA
# The workflow_run.pull_requests field is often empty for forks
# Use paginate to handle repos with many open PRs
head_sha="${{ github.event.workflow_run.head_sha }}"
pr_data=$(gh api "/repos/${{ github.repository }}/commits/$head_sha/pulls" \
--jq '.[0] | {number: .number, base_ref: .base.ref}')
if [ -z "$pr_data" ] || [ "$pr_data" == "null" ]; then
pr_data=$(gh api --paginate "/repos/${{ github.repository }}/pulls" \
--jq ".[] | select(.head.sha == \"$head_sha\") | {number: .number, base_ref: .base.ref}" \
| head -n 1)
if [ -z "$pr_data" ]; then
echo "No PR found for SHA $head_sha, skipping"
echo "skip=true" >> $GITHUB_OUTPUT
echo "skip=true" >> "$GITHUB_OUTPUT"
exit 0
fi
pr_number=$(echo "$pr_data" | jq -r '.number')
base_ref=$(echo "$pr_data" | jq -r '.base_ref')
echo "pr_number=$pr_number" >> $GITHUB_OUTPUT
echo "base_ref=$base_ref" >> $GITHUB_OUTPUT
echo "pr_number=$pr_number" >> "$GITHUB_OUTPUT"
echo "base_ref=$base_ref" >> "$GITHUB_OUTPUT"
echo "Found PR #$pr_number targeting base branch: $base_ref"
- name: Check out code from base repository
@@ -87,9 +90,9 @@ jobs:
if: steps.pr.outputs.skip != 'true'
run: |
if [ -f ./memory-analysis/memory-analysis-target.json ] && [ -f ./memory-analysis/memory-analysis-pr.json ]; then
echo "found=true" >> $GITHUB_OUTPUT
echo "found=true" >> "$GITHUB_OUTPUT"
else
echo "found=false" >> $GITHUB_OUTPUT
echo "found=false" >> "$GITHUB_OUTPUT"
echo "Memory analysis artifacts not found, skipping comment"
fi

View File

@@ -433,19 +433,15 @@ jobs:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
- name: Cache platformio
if: github.ref == 'refs/heads/dev'
# Cache PlatformIO packages to speed up test builds
# Note: Caches are repository-scoped, PRs from forks cannot restore from the main repo cache
- name: Cache PlatformIO
uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-test-${{ hashFiles('platformio.ini') }}
- name: Cache platformio
if: github.ref != 'refs/heads/dev'
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-test-${{ hashFiles('platformio.ini') }}
key: platformio-test-${{ runner.os }}-${{ env.DEFAULT_PYTHON }}-${{ hashFiles('platformio.ini') }}
restore-keys: |
platformio-test-${{ runner.os }}-${{ env.DEFAULT_PYTHON }}-
- name: Validate and compile components with intelligent grouping
run: |

View File

@@ -62,6 +62,40 @@ from esphome.util import (
_LOGGER = logging.getLogger(__name__)
# Special non-component keys that appear in configs
_NON_COMPONENT_KEYS = frozenset(
{
CONF_ESPHOME,
"substitutions",
"packages",
"globals",
"external_components",
"<<",
}
)
def detect_external_components(config: ConfigType) -> set[str]:
"""Detect external/custom components in the configuration.
External components are those that appear in the config but are not
part of ESPHome's built-in components and are not special config keys.
Args:
config: The ESPHome configuration dictionary
Returns:
A set of external component names
"""
from esphome.analyze_memory.helpers import get_esphome_components
builtin_components = get_esphome_components()
return {
key
for key in config
if key not in builtin_components and key not in _NON_COMPONENT_KEYS
}
class ArgsProtocol(Protocol):
device: list[str] | None
@@ -892,6 +926,54 @@ def command_idedata(args: ArgsProtocol, config: ConfigType) -> int:
return 0
def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int:
"""Analyze memory usage by component.
This command compiles the configuration and performs memory analysis.
Compilation is fast if sources haven't changed (just relinking).
"""
from esphome import platformio_api
from esphome.analyze_memory.cli import MemoryAnalyzerCLI
# Always compile to ensure fresh data (fast if no changes - just relinks)
exit_code = write_cpp(config)
if exit_code != 0:
return exit_code
exit_code = compile_program(args, config)
if exit_code != 0:
return exit_code
_LOGGER.info("Successfully compiled program.")
# Get idedata for analysis
idedata = platformio_api.get_idedata(config)
if idedata is None:
_LOGGER.error("Failed to get IDE data for memory analysis")
return 1
firmware_elf = Path(idedata.firmware_elf_path)
# Extract external components from config
external_components = detect_external_components(config)
_LOGGER.debug("Detected external components: %s", external_components)
# Perform memory analysis
_LOGGER.info("Analyzing memory usage...")
analyzer = MemoryAnalyzerCLI(
str(firmware_elf),
idedata.objdump_path,
idedata.readelf_path,
external_components,
)
analyzer.analyze()
# Generate and display report
report = analyzer.generate_report()
print()
print(report)
return 0
def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
new_name = args.name
for c in new_name:
@@ -1007,6 +1089,7 @@ POST_CONFIG_ACTIONS = {
"idedata": command_idedata,
"rename": command_rename,
"discover": command_discover,
"analyze-memory": command_analyze_memory,
}
SIMPLE_CONFIG_ACTIONS = [
@@ -1292,6 +1375,14 @@ def parse_args(argv):
)
parser_rename.add_argument("name", help="The new name for the device.", type=str)
parser_analyze_memory = subparsers.add_parser(
"analyze-memory",
help="Analyze memory usage by component.",
)
parser_analyze_memory.add_argument(
"configuration", help="Your YAML configuration file(s).", nargs="+"
)
# Keep backward compatibility with the old command line format of
# esphome <config> <command>.
#

View File

@@ -1,3 +1,4 @@
// TEST
#include "api_connection.h"
#ifdef USE_API
#ifdef USE_API_NOISE

View File

@@ -805,6 +805,7 @@ async def to_code(config):
add_idf_sdkconfig_option("CONFIG_AUTOSTART_ARDUINO", True)
add_idf_sdkconfig_option("CONFIG_MBEDTLS_PSK_MODES", True)
add_idf_sdkconfig_option("CONFIG_MBEDTLS_CERTIFICATE_BUNDLE", True)
add_idf_sdkconfig_option("CONFIG_ESP_PHY_REDUCE_TX_POWER", True)
cg.add_build_flag("-Wno-nonnull-compare")

View File

@@ -6,7 +6,7 @@ import esphome.config_validation as cv
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base
from .jinja import Jinja, JinjaStr, TemplateError, TemplateRuntimeError, has_jinja
from .jinja import Jinja, JinjaError, JinjaStr, has_jinja
CODEOWNERS = ["@esphome/core"]
_LOGGER = logging.getLogger(__name__)
@@ -57,17 +57,12 @@ def _expand_jinja(value, orig_value, path, jinja, ignore_missing):
"->".join(str(x) for x in path),
err.message,
)
except (
TemplateError,
TemplateRuntimeError,
RuntimeError,
ArithmeticError,
AttributeError,
TypeError,
) as err:
except JinjaError as err:
raise cv.Invalid(
f"{type(err).__name__} Error evaluating jinja expression '{value}': {str(err)}."
f" See {'->'.join(str(x) for x in path)}",
f"{err.error_name()} Error evaluating jinja expression '{value}': {str(err.parent())}."
f"\nEvaluation stack: (most recent evaluation last)\n{err.stack_trace_str()}"
f"\nRelevant context:\n{err.context_trace_str()}"
f"\nSee {'->'.join(str(x) for x in path)}",
path,
)
return value

View File

@@ -6,6 +6,8 @@ import re
import jinja2 as jinja
from jinja2.sandbox import SandboxedEnvironment
from esphome.yaml_util import ESPLiteralValue
TemplateError = jinja.TemplateError
TemplateSyntaxError = jinja.TemplateSyntaxError
TemplateRuntimeError = jinja.TemplateRuntimeError
@@ -26,18 +28,20 @@ def has_jinja(st):
return detect_jinja_re.search(st) is not None
# SAFE_GLOBAL_FUNCTIONS defines a allowlist of built-in functions that are considered safe to expose
# SAFE_GLOBALS defines a allowlist of built-in functions or modules that are considered safe to expose
# in Jinja templates or other sandboxed evaluation contexts. Only functions that do not allow
# arbitrary code execution, file access, or other security risks are included.
#
# The following functions are considered safe:
# - math: The entire math module is injected, allowing access to mathematical functions like sin, cos, sqrt, etc.
# - ord: Converts a character to its Unicode code point integer.
# - chr: Converts an integer to its corresponding Unicode character.
# - len: Returns the length of a sequence or collection.
#
# These functions were chosen because they are pure, have no side effects, and do not provide access
# to the file system, environment, or other potentially sensitive resources.
SAFE_GLOBAL_FUNCTIONS = {
SAFE_GLOBALS = {
"math": math, # Inject entire math module
"ord": ord,
"chr": chr,
"len": len,
@@ -56,22 +60,62 @@ class JinjaStr(str):
later in the main substitutions pass.
"""
Undefined = object()
def __new__(cls, value: str, upvalues=None):
obj = super().__new__(cls, value)
obj.upvalues = upvalues or {}
if isinstance(value, JinjaStr):
base = str(value)
merged = {**value.upvalues, **(upvalues or {})}
else:
base = value
merged = dict(upvalues or {})
obj = super().__new__(cls, base)
obj.upvalues = merged
obj.result = JinjaStr.Undefined
return obj
def __init__(self, value: str, upvalues=None):
self.upvalues = upvalues or {}
class JinjaError(Exception):
def __init__(self, context_trace: dict, expr: str):
self.context_trace = context_trace
self.eval_stack = [expr]
def parent(self):
return self.__context__
def error_name(self):
return type(self.parent()).__name__
def context_trace_str(self):
return "\n".join(
f" {k} = {repr(v)} ({type(v).__name__})"
for k, v in self.context_trace.items()
)
def stack_trace_str(self):
return "\n".join(
f" {len(self.eval_stack) - i}: {expr}{i == 0 and ' <-- ' + self.error_name() or ''}"
for i, expr in enumerate(self.eval_stack)
)
class Jinja:
class TrackerContext(jinja.runtime.Context):
def resolve_or_missing(self, key):
val = super().resolve_or_missing(key)
if isinstance(val, JinjaStr):
self.environment.context_trace[key] = val
val, _ = self.environment.expand(val)
self.environment.context_trace[key] = val
return val
class Jinja(SandboxedEnvironment):
"""
Wraps a Jinja environment
"""
def __init__(self, context_vars):
self.env = SandboxedEnvironment(
super().__init__(
trim_blocks=True,
lstrip_blocks=True,
block_start_string="<%",
@@ -82,13 +126,20 @@ class Jinja:
variable_end_string="}",
undefined=jinja.StrictUndefined,
)
self.env.add_extension("jinja2.ext.do")
self.env.globals["math"] = math # Inject entire math module
self.context_class = TrackerContext
self.add_extension("jinja2.ext.do")
self.context_trace = {}
self.context_vars = {**context_vars}
self.env.globals = {
**self.env.globals,
for k, v in self.context_vars.items():
if isinstance(v, ESPLiteralValue):
continue
if isinstance(v, str) and not isinstance(v, JinjaStr) and has_jinja(v):
self.context_vars[k] = JinjaStr(v, self.context_vars)
self.globals = {
**self.globals,
**self.context_vars,
**SAFE_GLOBAL_FUNCTIONS,
**SAFE_GLOBALS,
}
def safe_eval(self, expr):
@@ -110,23 +161,43 @@ class Jinja:
result = None
override_vars = {}
if isinstance(content_str, JinjaStr):
if content_str.result is not JinjaStr.Undefined:
return content_str.result, None
# If `value` is already a JinjaStr, it means we are trying to evaluate it again
# in a parent pass.
# Hopefully, all required variables are visible now.
override_vars = content_str.upvalues
old_trace = self.context_trace
self.context_trace = {}
try:
template = self.env.from_string(content_str)
template = self.from_string(content_str)
result = self.safe_eval(template.render(override_vars))
if isinstance(result, Undefined):
# This happens when the expression is simply an undefined variable. Jinja does not
# raise an exception, instead we get "Undefined".
# Trigger an UndefinedError exception so we skip to below.
print("" + result)
print("" + result) # force a UndefinedError exception
except (TemplateSyntaxError, UndefinedError) as err:
# `content_str` contains a Jinja expression that refers to a variable that is undefined
# in this scope. Perhaps it refers to a root substitution that is not visible yet.
# Therefore, return the original `content_str` as a JinjaStr, which contains the variables
# Therefore, return `content_str` as a JinjaStr, which contains the variables
# that are actually visible to it at this point to postpone evaluation.
return JinjaStr(content_str, {**self.context_vars, **override_vars}), err
except JinjaError as err:
err.context_trace = {**self.context_trace, **err.context_trace}
err.eval_stack.append(content_str)
raise err
except (
TemplateError,
TemplateRuntimeError,
RuntimeError,
ArithmeticError,
AttributeError,
TypeError,
) as err:
raise JinjaError(self.context_trace, content_str) from err
finally:
self.context_trace = old_trace
if isinstance(content_str, JinjaStr):
content_str.result = result
return result, None

View File

@@ -11,6 +11,7 @@ from esphome.const import (
CONF_COMMENT,
CONF_ESPHOME,
CONF_ETHERNET,
CONF_OPENTHREAD,
CONF_PORT,
CONF_USE_ADDRESS,
CONF_WEB_SERVER,
@@ -641,6 +642,9 @@ class EsphomeCore:
if CONF_ETHERNET in self.config:
return self.config[CONF_ETHERNET][CONF_USE_ADDRESS]
if CONF_OPENTHREAD in self.config:
return f"{self.name}.local"
return None
@property

View File

@@ -273,6 +273,8 @@
#ifdef USE_NRF52
#define USE_NRF52_DFU
#define USE_SOFTDEVICE_ID 7
#define USE_SOFTDEVICE_VERSION 1
#endif
// Disabled feature flags

View File

@@ -1,362 +0,0 @@
"""GitHub download cache for ESPHome.
This module provides caching functionality for GitHub release downloads
to avoid redundant network I/O when switching between platforms.
"""
from __future__ import annotations
import hashlib
import json
import logging
from pathlib import Path
import shutil
import time
import urllib.error
import urllib.request
_LOGGER = logging.getLogger(__name__)
class GitHubCache:
"""Manages caching of GitHub release downloads."""
# Cache expiration time in seconds (30 days)
CACHE_EXPIRATION_SECONDS = 30 * 24 * 60 * 60
def __init__(self, cache_dir: Path | None = None):
"""Initialize the cache manager.
Args:
cache_dir: Directory to store cached files.
Defaults to ~/.esphome_cache/github
"""
if cache_dir is None:
cache_dir = Path.home() / ".esphome_cache" / "github"
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "cache_metadata.json"
# Prune old files on initialization
try:
self._prune_old_files()
except Exception as e:
_LOGGER.debug("Failed to prune old cache files: %s", e)
def _load_metadata(self) -> dict:
"""Load cache metadata from disk."""
if self.metadata_file.exists():
try:
with open(self.metadata_file) as f:
return json.load(f)
except (OSError, ValueError, json.JSONDecodeError):
return {}
return {}
def _save_metadata(self, metadata: dict) -> None:
"""Save cache metadata to disk."""
try:
with open(self.metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
except OSError as e:
_LOGGER.debug("Failed to save cache metadata: %s", e)
@staticmethod
def is_github_url(url: str) -> bool:
"""Check if URL is a GitHub release download."""
return "github.com" in url.lower() and url.endswith(".zip")
def _get_cache_key(self, url: str) -> str:
"""Get cache key (hash) for a URL."""
return hashlib.sha256(url.encode()).hexdigest()
def _get_cache_path(self, url: str) -> Path:
"""Get cache file path for a URL."""
cache_key = self._get_cache_key(url)
ext = Path(url.split("?")[0]).suffix
return self.cache_dir / f"{cache_key}{ext}"
def _check_if_modified(
self,
url: str,
last_modified: str | None = None,
etag: str | None = None,
) -> bool:
"""Check if a URL has been modified using HTTP 304.
Args:
url: URL to check
last_modified: Last-Modified header from previous response
etag: ETag header from previous response
Returns:
True if modified, False if not modified (or offline/unreachable)
"""
if not last_modified and not etag:
# No cache headers available, assume modified
return True
try:
request = urllib.request.Request(url)
request.get_method = lambda: "HEAD"
if last_modified:
request.add_header("If-Modified-Since", last_modified)
if etag:
request.add_header("If-None-Match", etag)
try:
urllib.request.urlopen(request, timeout=10)
# 200 OK = file was modified
return True
except urllib.error.HTTPError as e:
if e.code == 304:
# Not modified
_LOGGER.debug("File not modified (HTTP 304): %s", url)
return False
# Other errors, assume modified to be safe
return True
except (OSError, urllib.error.URLError):
# If check fails (offline/network error), assume not modified (use cache)
_LOGGER.info("Cannot reach server (offline?), using cached file: %s", url)
return False
def get_cached_path(self, url: str, check_updates: bool = True) -> Path | None:
"""Get path to cached file if available and valid.
Args:
url: URL to check
check_updates: Whether to check for updates using HTTP 304
Returns:
Path to cached file if valid, None if needs download
"""
if not self.is_github_url(url):
return None
cache_path = self._get_cache_path(url)
if not cache_path.exists():
return None
# Load metadata
metadata = self._load_metadata()
cache_key = self._get_cache_key(url)
# Check if file should be re-downloaded
should_redownload = False
if check_updates and cache_key in metadata:
last_modified = metadata[cache_key].get("last_modified")
etag = metadata[cache_key].get("etag")
if self._check_if_modified(url, last_modified, etag):
# File was modified, need to re-download
_LOGGER.debug("Cached file is outdated: %s", url)
should_redownload = True
if should_redownload:
return None
# File is valid, update cached_at timestamp to keep it fresh
if cache_key in metadata:
metadata[cache_key]["cached_at"] = time.time()
self._save_metadata(metadata)
# Log appropriate message
if not check_updates:
_LOGGER.debug("Using cached file (no update check): %s", url)
elif cache_key not in metadata:
_LOGGER.debug("Using cached file (no metadata): %s", url)
else:
_LOGGER.debug("Using cached file: %s", url)
return cache_path
def save_to_cache(self, url: str, source_path: Path) -> None:
"""Save a downloaded file to cache.
Args:
url: URL the file was downloaded from
source_path: Path to the downloaded file
"""
if not self.is_github_url(url):
return
try:
cache_path = self._get_cache_path(url)
# Only copy if source and destination are different
if source_path.resolve() != cache_path.resolve():
shutil.copy2(source_path, cache_path)
# Try to get HTTP headers for caching
last_modified = None
etag = None
try:
request = urllib.request.Request(url)
request.get_method = lambda: "HEAD"
response = urllib.request.urlopen(request, timeout=10)
last_modified = response.headers.get("Last-Modified")
etag = response.headers.get("ETag")
except (OSError, urllib.error.URLError):
pass
# Update metadata
metadata = self._load_metadata()
cache_key = self._get_cache_key(url)
metadata[cache_key] = {
"url": url,
"size": cache_path.stat().st_size,
"cached_at": time.time(),
"last_modified": last_modified,
"etag": etag,
}
self._save_metadata(metadata)
_LOGGER.debug("Saved to cache: %s", url)
except OSError as e:
_LOGGER.debug("Failed to save to cache: %s", e)
def copy_from_cache(self, url: str, destination: Path) -> bool:
"""Copy a cached file to destination.
Args:
url: URL of the cached file
destination: Where to copy the file
Returns:
True if successful, False otherwise
"""
cached_path = self.get_cached_path(url, check_updates=True)
if not cached_path:
return False
try:
shutil.copy2(cached_path, destination)
_LOGGER.info("Using cached download for %s", url)
return True
except OSError as e:
_LOGGER.warning("Failed to use cache: %s", e)
return False
def cache_size(self) -> int:
"""Get total size of cached files in bytes."""
total = 0
try:
for file_path in self.cache_dir.glob("*"):
if file_path.is_file() and file_path != self.metadata_file:
total += file_path.stat().st_size
except OSError:
pass
return total
def list_cached(self) -> list[dict]:
"""List all cached files with metadata."""
cached_files = []
metadata = self._load_metadata()
for cache_key, meta in metadata.items():
cache_path = (
self.cache_dir / f"{cache_key}{Path(meta['url'].split('?')[0]).suffix}"
)
if cache_path.exists():
cached_files.append(
{
"url": meta["url"],
"path": cache_path,
"size": meta["size"],
"cached_at": meta.get("cached_at"),
"last_modified": meta.get("last_modified"),
"etag": meta.get("etag"),
}
)
return cached_files
def clear_cache(self) -> None:
"""Clear all cached files."""
try:
for file_path in self.cache_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
_LOGGER.info("Cache cleared: %s", self.cache_dir)
except OSError as e:
_LOGGER.warning("Failed to clear cache: %s", e)
def _prune_old_files(self) -> None:
"""Remove cache files older than CACHE_EXPIRATION_SECONDS."""
current_time = time.time()
metadata = self._load_metadata()
removed_count = 0
removed_size = 0
# Check each file in metadata
for cache_key, meta in list(metadata.items()):
cached_at = meta.get("cached_at", 0)
age_seconds = current_time - cached_at
if age_seconds > self.CACHE_EXPIRATION_SECONDS:
# File is too old, remove it
cache_path = (
self.cache_dir
/ f"{cache_key}{Path(meta['url'].split('?')[0]).suffix}"
)
if cache_path.exists():
file_size = cache_path.stat().st_size
cache_path.unlink()
removed_size += file_size
removed_count += 1
_LOGGER.debug(
"Pruned old cache file (age: %.1f days): %s",
age_seconds / (24 * 60 * 60),
meta["url"],
)
# Remove from metadata
del metadata[cache_key]
# Also check for orphaned files (files without metadata)
for file_path in self.cache_dir.glob("*.zip"):
if file_path == self.metadata_file:
continue
# Check if file is in metadata
found_in_metadata = False
for cache_key in metadata:
if file_path.name.startswith(cache_key):
found_in_metadata = True
break
if not found_in_metadata:
# Orphaned file - check age by modification time
file_age = current_time - file_path.stat().st_mtime
if file_age > self.CACHE_EXPIRATION_SECONDS:
file_size = file_path.stat().st_size
file_path.unlink()
removed_size += file_size
removed_count += 1
_LOGGER.debug(
"Pruned orphaned cache file (age: %.1f days): %s",
file_age / (24 * 60 * 60),
file_path.name,
)
# Save updated metadata if anything was removed
if removed_count > 0:
self._save_metadata(metadata)
removed_mb = removed_size / (1024 * 1024)
_LOGGER.info(
"Pruned %d old cache file(s), freed %.2f MB",
removed_count,
removed_mb,
)
# Global cache instance
_cache: GitHubCache | None = None
def get_cache() -> GitHubCache:
"""Get the global GitHub cache instance."""
global _cache # noqa: PLW0603
if _cache is None:
_cache = GitHubCache()
return _cache

View File

@@ -5,6 +5,7 @@ import os
from pathlib import Path
import re
import subprocess
from typing import Any
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME, KEY_CORE
from esphome.core import CORE, EsphomeError
@@ -43,168 +44,32 @@ def patch_structhash():
def patch_file_downloader():
"""Patch PlatformIO's FileDownloader to add caching and retry on PackageException errors.
"""Patch PlatformIO's FileDownloader to retry on PackageException errors."""
from platformio.package.download import FileDownloader
from platformio.package.exception import PackageException
This function attempts to patch PlatformIO's internal download mechanism.
If patching fails (due to API changes), it gracefully falls back to no caching.
"""
try:
from platformio.package.download import FileDownloader
from platformio.package.exception import PackageException
except ImportError as e:
_LOGGER.debug("Could not import PlatformIO modules for patching: %s", e)
return
original_init = FileDownloader.__init__
# Import our cache module
from esphome.github_cache import GitHubCache
def patched_init(self, *args: Any, **kwargs: Any) -> None:
max_retries = 3
_LOGGER.debug("Applying GitHub download cache patch...")
# Verify the classes have the expected methods before patching
if not hasattr(FileDownloader, "__init__") or not hasattr(FileDownloader, "start"):
_LOGGER.warning(
"PlatformIO FileDownloader API has changed, skipping cache patch"
)
return
try:
original_init = FileDownloader.__init__
original_start = FileDownloader.start
# Initialize cache in .platformio directory so it benefits from GitHub Actions cache
platformio_dir = Path.home() / ".platformio"
cache_dir = platformio_dir / "esphome_download_cache"
cache_dir_existed = cache_dir.exists()
cache = GitHubCache(cache_dir=cache_dir)
if not cache_dir_existed:
_LOGGER.info("Created GitHub download cache at: %s", cache.cache_dir)
except Exception as e:
_LOGGER.warning("Failed to initialize GitHub download cache: %s", e)
return
def patched_init(self, *args, **kwargs):
"""Patched init that checks cache before making HTTP connection."""
try:
# Extract URL from args (first positional argument)
url = args[0] if args else kwargs.get("url")
dest_dir = args[1] if len(args) > 1 else kwargs.get("dest_dir")
# Debug: Log all downloads
_LOGGER.debug("[GitHub Cache] Download request for: %s", url)
# Store URL for later use (original FileDownloader doesn't store it)
self._esphome_cache_url = url if cache.is_github_url(url) else None
# Check cache for GitHub URLs BEFORE making HTTP request
if self._esphome_cache_url:
_LOGGER.debug("[GitHub Cache] This is a GitHub URL, checking cache...")
self._esphome_use_cache = cache.get_cached_path(url, check_updates=True)
if self._esphome_use_cache:
_LOGGER.info(
"Found %s in cache, will restore instead of downloading",
Path(url.split("?")[0]).name,
)
_LOGGER.debug(
"[GitHub Cache] Found in cache: %s", self._esphome_use_cache
for attempt in range(max_retries):
try:
return original_init(self, *args, **kwargs)
except PackageException as e:
if attempt < max_retries - 1:
_LOGGER.warning(
"Package download failed: %s. Retrying... (attempt %d/%d)",
str(e),
attempt + 1,
max_retries,
)
else:
_LOGGER.debug(
"[GitHub Cache] Not in cache, will download and cache"
)
else:
self._esphome_use_cache = None
if url and str(url).startswith("http"):
_LOGGER.debug("[GitHub Cache] Not a GitHub URL, skipping cache")
# Final attempt - re-raise
raise
return None
# Only make HTTP connection if we don't have cached file
if self._esphome_use_cache:
# Skip HTTP connection, we'll handle this in start()
# Set minimal attributes to satisfy FileDownloader
# Create a mock session that can be safely closed in __del__
class MockSession:
def close(self):
pass
self._http_session = MockSession()
self._http_response = None
self._fname = Path(url.split("?")[0]).name
self._destination = self._fname
if dest_dir:
from os.path import join
self._destination = join(dest_dir, self._fname)
# Note: Actual restoration logged in patched_start
return None # Don't call original_init
# Normal initialization with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
return original_init(self, *args, **kwargs)
except PackageException as e:
if attempt < max_retries - 1:
_LOGGER.warning(
"Package download failed: %s. Retrying... (attempt %d/%d)",
str(e),
attempt + 1,
max_retries,
)
else:
# Final attempt - re-raise
raise
return None
except Exception as e:
# If anything goes wrong in our cache logic, fall back to normal download
_LOGGER.debug("Cache check failed, falling back to normal download: %s", e)
self._esphome_cache_url = None
self._esphome_use_cache = None
return original_init(self, *args, **kwargs)
def patched_start(self, *args, **kwargs):
"""Patched start that uses cache when available."""
try:
import shutil
# Get the cache URL and path that were set in __init__
cache_url = getattr(self, "_esphome_cache_url", None)
cached_file = getattr(self, "_esphome_use_cache", None)
# If we're using cache, copy file instead of downloading
if cached_file:
try:
shutil.copy2(cached_file, self._destination)
_LOGGER.info(
"Restored %s from cache (avoided download)",
Path(cached_file).name,
)
return True
except OSError as e:
_LOGGER.warning("Failed to copy from cache: %s", e)
# Fall through to re-download
# Perform normal download
result = original_start(self, *args, **kwargs)
# Save to cache if it was a GitHub URL
if cache_url:
try:
cache.save_to_cache(cache_url, Path(self._destination))
except OSError as e:
_LOGGER.debug("Failed to save to cache: %s", e)
return result
except Exception as e:
# If anything goes wrong, fall back to normal download
_LOGGER.debug("Cache restoration failed, using normal download: %s", e)
return original_start(self, *args, **kwargs)
# Apply the patches
try:
FileDownloader.__init__ = patched_init
FileDownloader.start = patched_start
_LOGGER.debug("GitHub download cache patch applied successfully")
except Exception as e:
_LOGGER.warning("Failed to apply GitHub download cache patch: %s", e)
FileDownloader.__init__ = patched_init
IGNORE_LIB_WARNINGS = f"(?:{'|'.join(['Hash', 'Update'])})"
@@ -222,8 +87,6 @@ FILTER_PLATFORMIO_LINES = [
r"Memory Usage -> https://bit.ly/pio-memory-usage",
r"Found: https://platformio.org/lib/show/.*",
r"Using cache: .*",
# Don't filter our cache messages - let users see when cache is being used
# r"Using cached download for .*",
r"Installing dependencies",
r"Library Manager: Already installed, built-in library",
r"Building in .* mode",

View File

@@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
Pre-cache PlatformIO GitHub Downloads
This script extracts GitHub URLs from platformio.ini and pre-caches them
to avoid redundant downloads when switching between ESP8266 and ESP32 builds.
Usage:
python3 script/cache_platformio_downloads.py [platformio.ini]
"""
import argparse
import configparser
from pathlib import Path
import re
import sys
# Import the cache manager
sys.path.insert(0, str(Path(__file__).parent.parent))
from esphome.github_cache import GitHubCache
def extract_github_urls(platformio_ini: Path) -> list[str]:
"""Extract all GitHub URLs from platformio.ini.
Args:
platformio_ini: Path to platformio.ini file
Returns:
List of GitHub URLs found
"""
config = configparser.ConfigParser(inline_comment_prefixes=(";",))
config.read(platformio_ini)
urls = []
github_pattern = re.compile(r"https://github\.com/[^\s;]+\.zip")
for section in config.sections():
conf = config[section]
# Check platform
if "platform" in conf:
platform_value = conf["platform"]
matches = github_pattern.findall(platform_value)
urls.extend(matches)
# Check platform_packages
if "platform_packages" in conf:
for line in conf["platform_packages"].splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
matches = github_pattern.findall(line)
urls.extend(matches)
# Remove duplicates while preserving order using dict
return list(dict.fromkeys(urls))
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Pre-cache PlatformIO GitHub downloads",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
This script scans platformio.ini for GitHub URLs and pre-caches them.
This avoids redundant downloads when switching between platforms (e.g., ESP8266 and ESP32).
Examples:
# Cache downloads from default platformio.ini
%(prog)s
# Cache downloads from specific file
%(prog)s custom_platformio.ini
# Show what would be cached without downloading
%(prog)s --dry-run
""",
)
parser.add_argument(
"platformio_ini",
nargs="?",
default="platformio.ini",
help="Path to platformio.ini (default: platformio.ini)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be cached without downloading",
)
parser.add_argument(
"--cache-dir",
type=Path,
help="Cache directory (default: ~/.platformio/esphome_download_cache)",
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-download even if cached",
)
args = parser.parse_args()
platformio_ini = Path(args.platformio_ini)
if not platformio_ini.exists():
print(f"Error: {platformio_ini} not found", file=sys.stderr)
return 1
# Extract URLs
print(f"Scanning {platformio_ini} for GitHub URLs...")
urls = extract_github_urls(platformio_ini)
if not urls:
print("No GitHub URLs found in platformio.ini")
return 0
print(f"Found {len(urls)} unique GitHub URL(s):")
for url in urls:
print(f" - {url}")
print()
if args.dry_run:
print("Dry run - not downloading")
return 0
# Initialize cache (use PlatformIO directory by default)
cache_dir = args.cache_dir
if cache_dir is None:
cache_dir = Path.home() / ".platformio" / "esphome_download_cache"
cache = GitHubCache(cache_dir)
# Cache each URL
success_count = 0
for i, url in enumerate(urls, 1):
print(f"[{i}/{len(urls)}] Checking {url}")
try:
# Use the download_with_progress from github_download_cache CLI
from script.github_download_cache import download_with_progress
download_with_progress(cache, url, force=args.force, check_updates=True)
success_count += 1
print()
except Exception as e:
print(f"Error caching {url}: {e}", file=sys.stderr)
print()
# Show cache stats
total_size = cache.cache_size()
size_mb = total_size / (1024 * 1024)
print("\nCache summary:")
print(f" Successfully cached: {success_count}/{len(urls)}")
print(f" Total cache size: {size_mb:.2f} MB")
print(f" Cache location: {cache.cache_dir}")
return 0 if success_count == len(urls) else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,195 +0,0 @@
#!/usr/bin/env python3
"""
GitHub Download Cache CLI
This script provides a command-line interface to the GitHub download cache.
The actual caching logic is in esphome/github_cache.py.
Usage:
python3 script/github_download_cache.py download URL
python3 script/github_download_cache.py list
python3 script/github_download_cache.py stats
python3 script/github_download_cache.py clear
"""
import argparse
from pathlib import Path
import sys
import urllib.request
# Add parent directory to path to import esphome modules
sys.path.insert(0, str(Path(__file__).parent.parent))
from esphome.github_cache import GitHubCache
def download_with_progress(
cache: GitHubCache, url: str, force: bool = False, check_updates: bool = True
) -> Path:
"""Download a URL with progress indicator and caching.
Args:
cache: GitHubCache instance
url: URL to download
force: Force re-download even if cached
check_updates: Check for updates using HTTP 304
Returns:
Path to cached file
"""
# If force, skip cache check
if not force:
cached_path = cache.get_cached_path(url, check_updates=check_updates)
if cached_path:
print(f"Using cached file for {url}")
print(f" Cache: {cached_path}")
return cached_path
# Need to download
print(f"Downloading {url}")
cache_path = cache._get_cache_path(url)
print(f" Cache: {cache_path}")
# Download with progress
temp_path = cache_path.with_suffix(cache_path.suffix + ".tmp")
try:
with urllib.request.urlopen(url) as response:
total_size = int(response.headers.get("Content-Length", 0))
downloaded = 0
with open(temp_path, "wb") as f:
while True:
chunk = response.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"\r Progress: {percent:.1f}%", end="", flush=True)
print() # New line after progress
# Move to final location
temp_path.replace(cache_path)
# Let cache handle metadata
cache.save_to_cache(url, cache_path)
return cache_path
except (OSError, urllib.error.URLError) as e:
if temp_path.exists():
temp_path.unlink()
raise RuntimeError(f"Failed to download {url}: {e}") from e
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description="GitHub Download Cache Manager",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Download and cache a URL
%(prog)s download https://github.com/pioarduino/registry/releases/download/0.0.1/esptoolpy-v5.1.0.zip
# List cached files
%(prog)s list
# Show cache statistics
%(prog)s stats
# Clear cache
%(prog)s clear
""",
)
parser.add_argument(
"--cache-dir",
type=Path,
help="Cache directory (default: ~/.platformio/esphome_download_cache)",
)
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
# Download command
download_parser = subparsers.add_parser("download", help="Download and cache a URL")
download_parser.add_argument("url", help="URL to download")
download_parser.add_argument(
"--force", action="store_true", help="Force re-download even if cached"
)
download_parser.add_argument(
"--no-check-updates",
action="store_true",
help="Skip checking for updates (don't use HTTP 304)",
)
# List command
subparsers.add_parser("list", help="List cached files")
# Stats command
subparsers.add_parser("stats", help="Show cache statistics")
# Clear command
subparsers.add_parser("clear", help="Clear all cached files")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Use PlatformIO cache directory by default
if args.cache_dir is None:
args.cache_dir = Path.home() / ".platformio" / "esphome_download_cache"
cache = GitHubCache(args.cache_dir)
if args.command == "download":
try:
check_updates = not args.no_check_updates
cache_path = download_with_progress(
cache, args.url, force=args.force, check_updates=check_updates
)
print(f"\nCached at: {cache_path}")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
elif args.command == "list":
cached = cache.list_cached()
if not cached:
print("No cached files")
return 0
print(f"Cached files ({len(cached)}):")
for item in cached:
size_mb = item["size"] / (1024 * 1024)
print(f" {item['url']}")
print(f" Size: {size_mb:.2f} MB")
print(f" Path: {item['path']}")
return 0
elif args.command == "stats":
total_size = cache.cache_size()
cached_count = len(cache.list_cached())
size_mb = total_size / (1024 * 1024)
print(f"Cache directory: {cache.cache_dir}")
print(f"Cached files: {cached_count}")
print(f"Total size: {size_mb:.2f} MB")
return 0
elif args.command == "clear":
cache.clear_cache()
return 0
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,138 +0,0 @@
#!/usr/bin/env python3
"""
PlatformIO Download Wrapper with Caching
This script can be used as a wrapper around PlatformIO downloads to add caching.
It intercepts download operations and uses the GitHub download cache.
This is designed to be called from PlatformIO's extra_scripts if needed.
"""
from pathlib import Path
import sys
# Import the cache manager
sys.path.insert(0, str(Path(__file__).parent))
from github_download_cache import GitHubDownloadCache
def is_github_url(url: str) -> bool:
"""Check if a URL is a GitHub URL."""
return "github.com" in url.lower()
def cached_download_handler(source, target, env):
"""PlatformIO download handler that uses caching for GitHub URLs.
This function can be registered as a custom download handler in PlatformIO.
Args:
source: Source URL
target: Target file path
env: SCons environment
"""
import shutil
import urllib.request
url = str(source[0])
target_path = Path(str(target[0]))
# Only cache GitHub URLs
if not is_github_url(url):
# Fall back to default download
print(f"Downloading (no cache): {url}")
with (
urllib.request.urlopen(url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
return
# Use cache for GitHub URLs
cache = GitHubDownloadCache()
print(f"Downloading with cache: {url}")
try:
cached_path = cache.download_with_cache(url, check_updates=True)
# Copy from cache to target
shutil.copy2(cached_path, target_path)
print(f" Copied to: {target_path}")
except Exception as e:
print(f"Cache download failed, using direct download: {e}")
# Fall back to direct download
with (
urllib.request.urlopen(url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
def setup_platformio_caching():
"""Setup PlatformIO to use cached downloads.
This should be called from an extra_scripts file in platformio.ini.
Example extra_scripts file (e.g., platformio_hooks.py):
Import("env")
from script.platformio_download_wrapper import setup_platformio_caching
setup_platformio_caching()
"""
try:
from SCons.Script import DefaultEnvironment
DefaultEnvironment()
# Register custom download handler
# Note: This may not work with all PlatformIO versions
# as the download mechanism is internal
print("Note: Direct download interception is not fully supported.")
print("Please use the cache_platformio_downloads.py script instead.")
except ImportError:
print("Warning: SCons not available, cannot setup download caching")
if __name__ == "__main__":
# CLI mode - can be used to manually download a URL with caching
import argparse
parser = argparse.ArgumentParser(description="Download a URL with caching")
parser.add_argument("url", help="URL to download")
parser.add_argument("target", help="Target file path")
parser.add_argument("--cache-dir", type=Path, help="Cache directory")
args = parser.parse_args()
cache = GitHubDownloadCache(args.cache_dir)
target_path = Path(args.target)
try:
if is_github_url(args.url):
print(f"Downloading with cache: {args.url}")
cached_path = cache.download_with_cache(args.url)
# Copy to target
import shutil
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(cached_path, target_path)
print(f"Copied to: {target_path}")
else:
print(f"Downloading directly (not a GitHub URL): {args.url}")
import shutil
import urllib.request
target_path.parent.mkdir(parents=True, exist_ok=True)
with (
urllib.request.urlopen(args.url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
sys.exit(0)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)

View File

@@ -8,6 +8,7 @@ substitutions:
area: 25
numberOne: 1
var1: 79
double_width: 14
test_list:
- The area is 56
- 56
@@ -25,3 +26,4 @@ test_list:
- ord("a") = 97
- chr(97) = a
- len([1,2,3]) = 3
- width = 7, double_width = 14

View File

@@ -8,6 +8,7 @@ substitutions:
area: 25
numberOne: 1
var1: 79
double_width: ${width * 2}
test_list:
- "The area is ${width * height}"
@@ -23,3 +24,4 @@ test_list:
- ord("a") = ${ ord("a") }
- chr(97) = ${ chr(97) }
- len([1,2,3]) = ${ len([1,2,3]) }
- width = ${width}, double_width = ${double_width}

View File

@@ -570,6 +570,13 @@ class TestEsphomeCore:
assert target.address == "4.3.2.1"
def test_address__openthread(self, target):
target.name = "test-device"
target.config = {}
target.config[const.CONF_OPENTHREAD] = {}
assert target.address == "test-device.local"
def test_is_esp32(self, target):
target.data[const.KEY_CORE] = {const.KEY_TARGET_PLATFORM: "esp32"}

View File

@@ -17,10 +17,12 @@ from esphome import platformio_api
from esphome.__main__ import (
Purpose,
choose_upload_log_host,
command_analyze_memory,
command_clean_all,
command_rename,
command_update_all,
command_wizard,
detect_external_components,
get_port_type,
has_ip_address,
has_mqtt,
@@ -226,13 +228,47 @@ def mock_run_external_process() -> Generator[Mock]:
@pytest.fixture
def mock_run_external_command() -> Generator[Mock]:
"""Mock run_external_command for testing."""
def mock_run_external_command_main() -> Generator[Mock]:
"""Mock run_external_command in __main__ module (different from platformio_api)."""
with patch("esphome.__main__.run_external_command") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_write_cpp() -> Generator[Mock]:
"""Mock write_cpp for testing."""
with patch("esphome.__main__.write_cpp") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_compile_program() -> Generator[Mock]:
"""Mock compile_program for testing."""
with patch("esphome.__main__.compile_program") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_get_esphome_components() -> Generator[Mock]:
"""Mock get_esphome_components for testing."""
with patch("esphome.analyze_memory.helpers.get_esphome_components") as mock:
mock.return_value = {"logger", "api", "ota"}
yield mock
@pytest.fixture
def mock_memory_analyzer_cli() -> Generator[Mock]:
"""Mock MemoryAnalyzerCLI for testing."""
with patch("esphome.analyze_memory.cli.MemoryAnalyzerCLI") as mock_class:
mock_analyzer = MagicMock()
mock_analyzer.generate_report.return_value = "Mock Memory Report"
mock_class.return_value = mock_analyzer
yield mock_class
def test_choose_upload_log_host_with_string_default() -> None:
"""Test with a single string default device."""
setup_core()
@@ -839,7 +875,7 @@ def test_upload_program_serial_esp8266_with_file(
def test_upload_using_esptool_path_conversion(
tmp_path: Path,
mock_run_external_command: Mock,
mock_run_external_command_main: Mock,
mock_get_idedata: Mock,
) -> None:
"""Test upload_using_esptool properly converts Path objects to strings for esptool.
@@ -875,10 +911,10 @@ def test_upload_using_esptool_path_conversion(
assert result == 0
# Verify that run_external_command was called
assert mock_run_external_command.call_count == 1
assert mock_run_external_command_main.call_count == 1
# Get the actual call arguments
call_args = mock_run_external_command.call_args[0]
call_args = mock_run_external_command_main.call_args[0]
# The first argument should be esptool.main function,
# followed by the command arguments
@@ -917,7 +953,7 @@ def test_upload_using_esptool_path_conversion(
def test_upload_using_esptool_with_file_path(
tmp_path: Path,
mock_run_external_command: Mock,
mock_run_external_command_main: Mock,
) -> None:
"""Test upload_using_esptool with a custom file that's a Path object."""
setup_core(platform=PLATFORM_ESP8266, tmp_path=tmp_path, name="test")
@@ -934,10 +970,10 @@ def test_upload_using_esptool_with_file_path(
assert result == 0
# Verify that run_external_command was called
mock_run_external_command.assert_called_once()
mock_run_external_command_main.assert_called_once()
# Get the actual call arguments
call_args = mock_run_external_command.call_args[0]
call_args = mock_run_external_command_main.call_args[0]
cmd_list = list(call_args[1:]) # Skip the esptool.main function
# Find the firmware path in the command
@@ -2273,3 +2309,226 @@ def test_show_logs_api_mqtt_timeout_fallback(
# Verify run_logs was called with only the static IP (MQTT failed)
mock_run_logs.assert_called_once_with(CORE.config, ["192.168.1.100"])
def test_detect_external_components_no_external(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components with no external components."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
"api": {},
}
result = detect_external_components(config)
assert result == set()
mock_get_esphome_components.assert_called_once()
def test_detect_external_components_with_external(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components detects external components."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {}, # Built-in
"api": {}, # Built-in
"my_custom_sensor": {}, # External
"another_custom": {}, # External
"external_components": [], # Special key, not a component
"substitutions": {}, # Special key, not a component
}
result = detect_external_components(config)
assert result == {"my_custom_sensor", "another_custom"}
mock_get_esphome_components.assert_called_once()
def test_detect_external_components_filters_special_keys(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components filters out special config keys."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"substitutions": {"key": "value"},
"packages": {},
"globals": [],
"external_components": [],
"<<": {}, # YAML merge key
}
result = detect_external_components(config)
assert result == set()
mock_get_esphome_components.assert_called_once()
def test_command_analyze_memory_success(
tmp_path: Path,
capfd: CaptureFixture[str],
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
mock_get_esphome_components: Mock,
mock_memory_analyzer_cli: Mock,
) -> None:
"""Test command_analyze_memory with successful compilation and analysis."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
# Create firmware.elf file
firmware_path = (
tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device"
)
firmware_path.mkdir(parents=True, exist_ok=True)
firmware_elf = firmware_path / "firmware.elf"
firmware_elf.write_text("mock elf file")
# Mock idedata
mock_idedata_obj = MagicMock(spec=platformio_api.IDEData)
mock_idedata_obj.firmware_elf_path = str(firmware_elf)
mock_idedata_obj.objdump_path = "/path/to/objdump"
mock_idedata_obj.readelf_path = "/path/to/readelf"
mock_get_idedata.return_value = mock_idedata_obj
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
}
args = MockArgs()
result = command_analyze_memory(args, config)
assert result == 0
# Verify compilation was done
mock_write_cpp.assert_called_once_with(config)
mock_compile_program.assert_called_once_with(args, config)
# Verify analyzer was created with correct parameters
mock_memory_analyzer_cli.assert_called_once_with(
str(firmware_elf),
"/path/to/objdump",
"/path/to/readelf",
set(), # No external components
)
# Verify analysis was run
mock_analyzer = mock_memory_analyzer_cli.return_value
mock_analyzer.analyze.assert_called_once()
mock_analyzer.generate_report.assert_called_once()
# Verify report was printed
captured = capfd.readouterr()
assert "Mock Memory Report" in captured.out
def test_command_analyze_memory_with_external_components(
tmp_path: Path,
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
mock_get_esphome_components: Mock,
mock_memory_analyzer_cli: Mock,
) -> None:
"""Test command_analyze_memory detects external components."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
# Create firmware.elf file
firmware_path = (
tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device"
)
firmware_path.mkdir(parents=True, exist_ok=True)
firmware_elf = firmware_path / "firmware.elf"
firmware_elf.write_text("mock elf file")
# Mock idedata
mock_idedata_obj = MagicMock(spec=platformio_api.IDEData)
mock_idedata_obj.firmware_elf_path = str(firmware_elf)
mock_idedata_obj.objdump_path = "/path/to/objdump"
mock_idedata_obj.readelf_path = "/path/to/readelf"
mock_get_idedata.return_value = mock_idedata_obj
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
"my_custom_component": {"param": "value"}, # External component
"external_components": [{"source": "github://user/repo"}], # Not a component
}
args = MockArgs()
result = command_analyze_memory(args, config)
assert result == 0
# Verify analyzer was created with external components detected
mock_memory_analyzer_cli.assert_called_once_with(
str(firmware_elf),
"/path/to/objdump",
"/path/to/readelf",
{"my_custom_component"}, # External component detected
)
def test_command_analyze_memory_write_cpp_fails(
tmp_path: Path,
mock_write_cpp: Mock,
) -> None:
"""Test command_analyze_memory when write_cpp fails."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_write_cpp.return_value = 1 # Failure
result = command_analyze_memory(args, config)
assert result == 1
mock_write_cpp.assert_called_once_with(config)
def test_command_analyze_memory_compile_fails(
tmp_path: Path,
mock_write_cpp: Mock,
mock_compile_program: Mock,
) -> None:
"""Test command_analyze_memory when compilation fails."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_compile_program.return_value = 1 # Compilation failed
result = command_analyze_memory(args, config)
assert result == 1
mock_write_cpp.assert_called_once_with(config)
mock_compile_program.assert_called_once_with(args, config)
def test_command_analyze_memory_no_idedata(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
) -> None:
"""Test command_analyze_memory when idedata cannot be retrieved."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_get_idedata.return_value = None # Failed to get idedata
with caplog.at_level(logging.ERROR):
result = command_analyze_memory(args, config)
assert result == 1
assert "Failed to get IDE data for memory analysis" in caplog.text