Compare commits

..

8 Commits

Author SHA1 Message Date
J. Nick Koston
4c9e4d30e9 tweak 2025-10-19 14:56:40 -10:00
J. Nick Koston
db42983f0c wip 2025-10-19 14:54:31 -10:00
J. Nick Koston
20c65f70ed wip 2025-10-19 14:47:19 -10:00
J. Nick Koston
38e31e328c wip 2025-10-19 14:46:25 -10:00
J. Nick Koston
58cecff778 wip 2025-10-19 14:44:04 -10:00
J. Nick Koston
1946656ea8 wip 2025-10-19 14:40:47 -10:00
J. Nick Koston
c9700a0450 wip 2025-10-19 14:35:09 -10:00
J. Nick Koston
0eab64ffe5 cache github downloads 2025-10-19 14:33:26 -10:00
18 changed files with 1073 additions and 511 deletions

View File

@@ -28,23 +28,20 @@ jobs:
run: |
# Get PR details by searching for PR with matching head SHA
# The workflow_run.pull_requests field is often empty for forks
# Use paginate to handle repos with many open PRs
head_sha="${{ github.event.workflow_run.head_sha }}"
pr_data=$(gh api --paginate "/repos/${{ github.repository }}/pulls" \
--jq ".[] | select(.head.sha == \"$head_sha\") | {number: .number, base_ref: .base.ref}" \
| head -n 1)
if [ -z "$pr_data" ]; then
pr_data=$(gh api "/repos/${{ github.repository }}/commits/$head_sha/pulls" \
--jq '.[0] | {number: .number, base_ref: .base.ref}')
if [ -z "$pr_data" ] || [ "$pr_data" == "null" ]; then
echo "No PR found for SHA $head_sha, skipping"
echo "skip=true" >> "$GITHUB_OUTPUT"
echo "skip=true" >> $GITHUB_OUTPUT
exit 0
fi
pr_number=$(echo "$pr_data" | jq -r '.number')
base_ref=$(echo "$pr_data" | jq -r '.base_ref')
echo "pr_number=$pr_number" >> "$GITHUB_OUTPUT"
echo "base_ref=$base_ref" >> "$GITHUB_OUTPUT"
echo "pr_number=$pr_number" >> $GITHUB_OUTPUT
echo "base_ref=$base_ref" >> $GITHUB_OUTPUT
echo "Found PR #$pr_number targeting base branch: $base_ref"
- name: Check out code from base repository
@@ -90,9 +87,9 @@ jobs:
if: steps.pr.outputs.skip != 'true'
run: |
if [ -f ./memory-analysis/memory-analysis-target.json ] && [ -f ./memory-analysis/memory-analysis-pr.json ]; then
echo "found=true" >> "$GITHUB_OUTPUT"
echo "found=true" >> $GITHUB_OUTPUT
else
echo "found=false" >> "$GITHUB_OUTPUT"
echo "found=false" >> $GITHUB_OUTPUT
echo "Memory analysis artifacts not found, skipping comment"
fi

View File

@@ -433,15 +433,19 @@ jobs:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
# Cache PlatformIO packages to speed up test builds
# Note: Caches are repository-scoped, PRs from forks cannot restore from the main repo cache
- name: Cache PlatformIO
- name: Cache platformio
if: github.ref == 'refs/heads/dev'
uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-test-${{ runner.os }}-${{ env.DEFAULT_PYTHON }}-${{ hashFiles('platformio.ini') }}
restore-keys: |
platformio-test-${{ runner.os }}-${{ env.DEFAULT_PYTHON }}-
key: platformio-test-${{ hashFiles('platformio.ini') }}
- name: Cache platformio
if: github.ref != 'refs/heads/dev'
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-test-${{ hashFiles('platformio.ini') }}
- name: Validate and compile components with intelligent grouping
run: |

View File

@@ -62,40 +62,6 @@ from esphome.util import (
_LOGGER = logging.getLogger(__name__)
# Special non-component keys that appear in configs
_NON_COMPONENT_KEYS = frozenset(
{
CONF_ESPHOME,
"substitutions",
"packages",
"globals",
"external_components",
"<<",
}
)
def detect_external_components(config: ConfigType) -> set[str]:
"""Detect external/custom components in the configuration.
External components are those that appear in the config but are not
part of ESPHome's built-in components and are not special config keys.
Args:
config: The ESPHome configuration dictionary
Returns:
A set of external component names
"""
from esphome.analyze_memory.helpers import get_esphome_components
builtin_components = get_esphome_components()
return {
key
for key in config
if key not in builtin_components and key not in _NON_COMPONENT_KEYS
}
class ArgsProtocol(Protocol):
device: list[str] | None
@@ -926,54 +892,6 @@ def command_idedata(args: ArgsProtocol, config: ConfigType) -> int:
return 0
def command_analyze_memory(args: ArgsProtocol, config: ConfigType) -> int:
"""Analyze memory usage by component.
This command compiles the configuration and performs memory analysis.
Compilation is fast if sources haven't changed (just relinking).
"""
from esphome import platformio_api
from esphome.analyze_memory.cli import MemoryAnalyzerCLI
# Always compile to ensure fresh data (fast if no changes - just relinks)
exit_code = write_cpp(config)
if exit_code != 0:
return exit_code
exit_code = compile_program(args, config)
if exit_code != 0:
return exit_code
_LOGGER.info("Successfully compiled program.")
# Get idedata for analysis
idedata = platformio_api.get_idedata(config)
if idedata is None:
_LOGGER.error("Failed to get IDE data for memory analysis")
return 1
firmware_elf = Path(idedata.firmware_elf_path)
# Extract external components from config
external_components = detect_external_components(config)
_LOGGER.debug("Detected external components: %s", external_components)
# Perform memory analysis
_LOGGER.info("Analyzing memory usage...")
analyzer = MemoryAnalyzerCLI(
str(firmware_elf),
idedata.objdump_path,
idedata.readelf_path,
external_components,
)
analyzer.analyze()
# Generate and display report
report = analyzer.generate_report()
print()
print(report)
return 0
def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
new_name = args.name
for c in new_name:
@@ -1089,7 +1007,6 @@ POST_CONFIG_ACTIONS = {
"idedata": command_idedata,
"rename": command_rename,
"discover": command_discover,
"analyze-memory": command_analyze_memory,
}
SIMPLE_CONFIG_ACTIONS = [
@@ -1375,14 +1292,6 @@ def parse_args(argv):
)
parser_rename.add_argument("name", help="The new name for the device.", type=str)
parser_analyze_memory = subparsers.add_parser(
"analyze-memory",
help="Analyze memory usage by component.",
)
parser_analyze_memory.add_argument(
"configuration", help="Your YAML configuration file(s).", nargs="+"
)
# Keep backward compatibility with the old command line format of
# esphome <config> <command>.
#

View File

@@ -1,4 +1,3 @@
// TEST
#include "api_connection.h"
#ifdef USE_API
#ifdef USE_API_NOISE

View File

@@ -805,7 +805,6 @@ async def to_code(config):
add_idf_sdkconfig_option("CONFIG_AUTOSTART_ARDUINO", True)
add_idf_sdkconfig_option("CONFIG_MBEDTLS_PSK_MODES", True)
add_idf_sdkconfig_option("CONFIG_MBEDTLS_CERTIFICATE_BUNDLE", True)
add_idf_sdkconfig_option("CONFIG_ESP_PHY_REDUCE_TX_POWER", True)
cg.add_build_flag("-Wno-nonnull-compare")

View File

@@ -6,7 +6,7 @@ import esphome.config_validation as cv
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base
from .jinja import Jinja, JinjaError, JinjaStr, has_jinja
from .jinja import Jinja, JinjaStr, TemplateError, TemplateRuntimeError, has_jinja
CODEOWNERS = ["@esphome/core"]
_LOGGER = logging.getLogger(__name__)
@@ -57,12 +57,17 @@ def _expand_jinja(value, orig_value, path, jinja, ignore_missing):
"->".join(str(x) for x in path),
err.message,
)
except JinjaError as err:
except (
TemplateError,
TemplateRuntimeError,
RuntimeError,
ArithmeticError,
AttributeError,
TypeError,
) as err:
raise cv.Invalid(
f"{err.error_name()} Error evaluating jinja expression '{value}': {str(err.parent())}."
f"\nEvaluation stack: (most recent evaluation last)\n{err.stack_trace_str()}"
f"\nRelevant context:\n{err.context_trace_str()}"
f"\nSee {'->'.join(str(x) for x in path)}",
f"{type(err).__name__} Error evaluating jinja expression '{value}': {str(err)}."
f" See {'->'.join(str(x) for x in path)}",
path,
)
return value

View File

@@ -6,8 +6,6 @@ import re
import jinja2 as jinja
from jinja2.sandbox import SandboxedEnvironment
from esphome.yaml_util import ESPLiteralValue
TemplateError = jinja.TemplateError
TemplateSyntaxError = jinja.TemplateSyntaxError
TemplateRuntimeError = jinja.TemplateRuntimeError
@@ -28,20 +26,18 @@ def has_jinja(st):
return detect_jinja_re.search(st) is not None
# SAFE_GLOBALS defines a allowlist of built-in functions or modules that are considered safe to expose
# SAFE_GLOBAL_FUNCTIONS defines a allowlist of built-in functions that are considered safe to expose
# in Jinja templates or other sandboxed evaluation contexts. Only functions that do not allow
# arbitrary code execution, file access, or other security risks are included.
#
# The following functions are considered safe:
# - math: The entire math module is injected, allowing access to mathematical functions like sin, cos, sqrt, etc.
# - ord: Converts a character to its Unicode code point integer.
# - chr: Converts an integer to its corresponding Unicode character.
# - len: Returns the length of a sequence or collection.
#
# These functions were chosen because they are pure, have no side effects, and do not provide access
# to the file system, environment, or other potentially sensitive resources.
SAFE_GLOBALS = {
"math": math, # Inject entire math module
SAFE_GLOBAL_FUNCTIONS = {
"ord": ord,
"chr": chr,
"len": len,
@@ -60,62 +56,22 @@ class JinjaStr(str):
later in the main substitutions pass.
"""
Undefined = object()
def __new__(cls, value: str, upvalues=None):
if isinstance(value, JinjaStr):
base = str(value)
merged = {**value.upvalues, **(upvalues or {})}
else:
base = value
merged = dict(upvalues or {})
obj = super().__new__(cls, base)
obj.upvalues = merged
obj.result = JinjaStr.Undefined
obj = super().__new__(cls, value)
obj.upvalues = upvalues or {}
return obj
class JinjaError(Exception):
def __init__(self, context_trace: dict, expr: str):
self.context_trace = context_trace
self.eval_stack = [expr]
def parent(self):
return self.__context__
def error_name(self):
return type(self.parent()).__name__
def context_trace_str(self):
return "\n".join(
f" {k} = {repr(v)} ({type(v).__name__})"
for k, v in self.context_trace.items()
)
def stack_trace_str(self):
return "\n".join(
f" {len(self.eval_stack) - i}: {expr}{i == 0 and ' <-- ' + self.error_name() or ''}"
for i, expr in enumerate(self.eval_stack)
)
def __init__(self, value: str, upvalues=None):
self.upvalues = upvalues or {}
class TrackerContext(jinja.runtime.Context):
def resolve_or_missing(self, key):
val = super().resolve_or_missing(key)
if isinstance(val, JinjaStr):
self.environment.context_trace[key] = val
val, _ = self.environment.expand(val)
self.environment.context_trace[key] = val
return val
class Jinja(SandboxedEnvironment):
class Jinja:
"""
Wraps a Jinja environment
"""
def __init__(self, context_vars):
super().__init__(
self.env = SandboxedEnvironment(
trim_blocks=True,
lstrip_blocks=True,
block_start_string="<%",
@@ -126,20 +82,13 @@ class Jinja(SandboxedEnvironment):
variable_end_string="}",
undefined=jinja.StrictUndefined,
)
self.context_class = TrackerContext
self.add_extension("jinja2.ext.do")
self.context_trace = {}
self.env.add_extension("jinja2.ext.do")
self.env.globals["math"] = math # Inject entire math module
self.context_vars = {**context_vars}
for k, v in self.context_vars.items():
if isinstance(v, ESPLiteralValue):
continue
if isinstance(v, str) and not isinstance(v, JinjaStr) and has_jinja(v):
self.context_vars[k] = JinjaStr(v, self.context_vars)
self.globals = {
**self.globals,
self.env.globals = {
**self.env.globals,
**self.context_vars,
**SAFE_GLOBALS,
**SAFE_GLOBAL_FUNCTIONS,
}
def safe_eval(self, expr):
@@ -161,43 +110,23 @@ class Jinja(SandboxedEnvironment):
result = None
override_vars = {}
if isinstance(content_str, JinjaStr):
if content_str.result is not JinjaStr.Undefined:
return content_str.result, None
# If `value` is already a JinjaStr, it means we are trying to evaluate it again
# in a parent pass.
# Hopefully, all required variables are visible now.
override_vars = content_str.upvalues
old_trace = self.context_trace
self.context_trace = {}
try:
template = self.from_string(content_str)
template = self.env.from_string(content_str)
result = self.safe_eval(template.render(override_vars))
if isinstance(result, Undefined):
print("" + result) # force a UndefinedError exception
# This happens when the expression is simply an undefined variable. Jinja does not
# raise an exception, instead we get "Undefined".
# Trigger an UndefinedError exception so we skip to below.
print("" + result)
except (TemplateSyntaxError, UndefinedError) as err:
# `content_str` contains a Jinja expression that refers to a variable that is undefined
# in this scope. Perhaps it refers to a root substitution that is not visible yet.
# Therefore, return `content_str` as a JinjaStr, which contains the variables
# Therefore, return the original `content_str` as a JinjaStr, which contains the variables
# that are actually visible to it at this point to postpone evaluation.
return JinjaStr(content_str, {**self.context_vars, **override_vars}), err
except JinjaError as err:
err.context_trace = {**self.context_trace, **err.context_trace}
err.eval_stack.append(content_str)
raise err
except (
TemplateError,
TemplateRuntimeError,
RuntimeError,
ArithmeticError,
AttributeError,
TypeError,
) as err:
raise JinjaError(self.context_trace, content_str) from err
finally:
self.context_trace = old_trace
if isinstance(content_str, JinjaStr):
content_str.result = result
return result, None

View File

@@ -11,7 +11,6 @@ from esphome.const import (
CONF_COMMENT,
CONF_ESPHOME,
CONF_ETHERNET,
CONF_OPENTHREAD,
CONF_PORT,
CONF_USE_ADDRESS,
CONF_WEB_SERVER,
@@ -642,9 +641,6 @@ class EsphomeCore:
if CONF_ETHERNET in self.config:
return self.config[CONF_ETHERNET][CONF_USE_ADDRESS]
if CONF_OPENTHREAD in self.config:
return f"{self.name}.local"
return None
@property

View File

@@ -273,8 +273,6 @@
#ifdef USE_NRF52
#define USE_NRF52_DFU
#define USE_SOFTDEVICE_ID 7
#define USE_SOFTDEVICE_VERSION 1
#endif
// Disabled feature flags

362
esphome/github_cache.py Normal file
View File

@@ -0,0 +1,362 @@
"""GitHub download cache for ESPHome.
This module provides caching functionality for GitHub release downloads
to avoid redundant network I/O when switching between platforms.
"""
from __future__ import annotations
import hashlib
import json
import logging
from pathlib import Path
import shutil
import time
import urllib.error
import urllib.request
_LOGGER = logging.getLogger(__name__)
class GitHubCache:
"""Manages caching of GitHub release downloads."""
# Cache expiration time in seconds (30 days)
CACHE_EXPIRATION_SECONDS = 30 * 24 * 60 * 60
def __init__(self, cache_dir: Path | None = None):
"""Initialize the cache manager.
Args:
cache_dir: Directory to store cached files.
Defaults to ~/.esphome_cache/github
"""
if cache_dir is None:
cache_dir = Path.home() / ".esphome_cache" / "github"
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "cache_metadata.json"
# Prune old files on initialization
try:
self._prune_old_files()
except Exception as e:
_LOGGER.debug("Failed to prune old cache files: %s", e)
def _load_metadata(self) -> dict:
"""Load cache metadata from disk."""
if self.metadata_file.exists():
try:
with open(self.metadata_file) as f:
return json.load(f)
except (OSError, ValueError, json.JSONDecodeError):
return {}
return {}
def _save_metadata(self, metadata: dict) -> None:
"""Save cache metadata to disk."""
try:
with open(self.metadata_file, "w") as f:
json.dump(metadata, f, indent=2)
except OSError as e:
_LOGGER.debug("Failed to save cache metadata: %s", e)
@staticmethod
def is_github_url(url: str) -> bool:
"""Check if URL is a GitHub release download."""
return "github.com" in url.lower() and url.endswith(".zip")
def _get_cache_key(self, url: str) -> str:
"""Get cache key (hash) for a URL."""
return hashlib.sha256(url.encode()).hexdigest()
def _get_cache_path(self, url: str) -> Path:
"""Get cache file path for a URL."""
cache_key = self._get_cache_key(url)
ext = Path(url.split("?")[0]).suffix
return self.cache_dir / f"{cache_key}{ext}"
def _check_if_modified(
self,
url: str,
last_modified: str | None = None,
etag: str | None = None,
) -> bool:
"""Check if a URL has been modified using HTTP 304.
Args:
url: URL to check
last_modified: Last-Modified header from previous response
etag: ETag header from previous response
Returns:
True if modified, False if not modified (or offline/unreachable)
"""
if not last_modified and not etag:
# No cache headers available, assume modified
return True
try:
request = urllib.request.Request(url)
request.get_method = lambda: "HEAD"
if last_modified:
request.add_header("If-Modified-Since", last_modified)
if etag:
request.add_header("If-None-Match", etag)
try:
urllib.request.urlopen(request, timeout=10)
# 200 OK = file was modified
return True
except urllib.error.HTTPError as e:
if e.code == 304:
# Not modified
_LOGGER.debug("File not modified (HTTP 304): %s", url)
return False
# Other errors, assume modified to be safe
return True
except (OSError, urllib.error.URLError):
# If check fails (offline/network error), assume not modified (use cache)
_LOGGER.info("Cannot reach server (offline?), using cached file: %s", url)
return False
def get_cached_path(self, url: str, check_updates: bool = True) -> Path | None:
"""Get path to cached file if available and valid.
Args:
url: URL to check
check_updates: Whether to check for updates using HTTP 304
Returns:
Path to cached file if valid, None if needs download
"""
if not self.is_github_url(url):
return None
cache_path = self._get_cache_path(url)
if not cache_path.exists():
return None
# Load metadata
metadata = self._load_metadata()
cache_key = self._get_cache_key(url)
# Check if file should be re-downloaded
should_redownload = False
if check_updates and cache_key in metadata:
last_modified = metadata[cache_key].get("last_modified")
etag = metadata[cache_key].get("etag")
if self._check_if_modified(url, last_modified, etag):
# File was modified, need to re-download
_LOGGER.debug("Cached file is outdated: %s", url)
should_redownload = True
if should_redownload:
return None
# File is valid, update cached_at timestamp to keep it fresh
if cache_key in metadata:
metadata[cache_key]["cached_at"] = time.time()
self._save_metadata(metadata)
# Log appropriate message
if not check_updates:
_LOGGER.debug("Using cached file (no update check): %s", url)
elif cache_key not in metadata:
_LOGGER.debug("Using cached file (no metadata): %s", url)
else:
_LOGGER.debug("Using cached file: %s", url)
return cache_path
def save_to_cache(self, url: str, source_path: Path) -> None:
"""Save a downloaded file to cache.
Args:
url: URL the file was downloaded from
source_path: Path to the downloaded file
"""
if not self.is_github_url(url):
return
try:
cache_path = self._get_cache_path(url)
# Only copy if source and destination are different
if source_path.resolve() != cache_path.resolve():
shutil.copy2(source_path, cache_path)
# Try to get HTTP headers for caching
last_modified = None
etag = None
try:
request = urllib.request.Request(url)
request.get_method = lambda: "HEAD"
response = urllib.request.urlopen(request, timeout=10)
last_modified = response.headers.get("Last-Modified")
etag = response.headers.get("ETag")
except (OSError, urllib.error.URLError):
pass
# Update metadata
metadata = self._load_metadata()
cache_key = self._get_cache_key(url)
metadata[cache_key] = {
"url": url,
"size": cache_path.stat().st_size,
"cached_at": time.time(),
"last_modified": last_modified,
"etag": etag,
}
self._save_metadata(metadata)
_LOGGER.debug("Saved to cache: %s", url)
except OSError as e:
_LOGGER.debug("Failed to save to cache: %s", e)
def copy_from_cache(self, url: str, destination: Path) -> bool:
"""Copy a cached file to destination.
Args:
url: URL of the cached file
destination: Where to copy the file
Returns:
True if successful, False otherwise
"""
cached_path = self.get_cached_path(url, check_updates=True)
if not cached_path:
return False
try:
shutil.copy2(cached_path, destination)
_LOGGER.info("Using cached download for %s", url)
return True
except OSError as e:
_LOGGER.warning("Failed to use cache: %s", e)
return False
def cache_size(self) -> int:
"""Get total size of cached files in bytes."""
total = 0
try:
for file_path in self.cache_dir.glob("*"):
if file_path.is_file() and file_path != self.metadata_file:
total += file_path.stat().st_size
except OSError:
pass
return total
def list_cached(self) -> list[dict]:
"""List all cached files with metadata."""
cached_files = []
metadata = self._load_metadata()
for cache_key, meta in metadata.items():
cache_path = (
self.cache_dir / f"{cache_key}{Path(meta['url'].split('?')[0]).suffix}"
)
if cache_path.exists():
cached_files.append(
{
"url": meta["url"],
"path": cache_path,
"size": meta["size"],
"cached_at": meta.get("cached_at"),
"last_modified": meta.get("last_modified"),
"etag": meta.get("etag"),
}
)
return cached_files
def clear_cache(self) -> None:
"""Clear all cached files."""
try:
for file_path in self.cache_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
_LOGGER.info("Cache cleared: %s", self.cache_dir)
except OSError as e:
_LOGGER.warning("Failed to clear cache: %s", e)
def _prune_old_files(self) -> None:
"""Remove cache files older than CACHE_EXPIRATION_SECONDS."""
current_time = time.time()
metadata = self._load_metadata()
removed_count = 0
removed_size = 0
# Check each file in metadata
for cache_key, meta in list(metadata.items()):
cached_at = meta.get("cached_at", 0)
age_seconds = current_time - cached_at
if age_seconds > self.CACHE_EXPIRATION_SECONDS:
# File is too old, remove it
cache_path = (
self.cache_dir
/ f"{cache_key}{Path(meta['url'].split('?')[0]).suffix}"
)
if cache_path.exists():
file_size = cache_path.stat().st_size
cache_path.unlink()
removed_size += file_size
removed_count += 1
_LOGGER.debug(
"Pruned old cache file (age: %.1f days): %s",
age_seconds / (24 * 60 * 60),
meta["url"],
)
# Remove from metadata
del metadata[cache_key]
# Also check for orphaned files (files without metadata)
for file_path in self.cache_dir.glob("*.zip"):
if file_path == self.metadata_file:
continue
# Check if file is in metadata
found_in_metadata = False
for cache_key in metadata:
if file_path.name.startswith(cache_key):
found_in_metadata = True
break
if not found_in_metadata:
# Orphaned file - check age by modification time
file_age = current_time - file_path.stat().st_mtime
if file_age > self.CACHE_EXPIRATION_SECONDS:
file_size = file_path.stat().st_size
file_path.unlink()
removed_size += file_size
removed_count += 1
_LOGGER.debug(
"Pruned orphaned cache file (age: %.1f days): %s",
file_age / (24 * 60 * 60),
file_path.name,
)
# Save updated metadata if anything was removed
if removed_count > 0:
self._save_metadata(metadata)
removed_mb = removed_size / (1024 * 1024)
_LOGGER.info(
"Pruned %d old cache file(s), freed %.2f MB",
removed_count,
removed_mb,
)
# Global cache instance
_cache: GitHubCache | None = None
def get_cache() -> GitHubCache:
"""Get the global GitHub cache instance."""
global _cache # noqa: PLW0603
if _cache is None:
_cache = GitHubCache()
return _cache

View File

@@ -5,7 +5,6 @@ import os
from pathlib import Path
import re
import subprocess
from typing import Any
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME, KEY_CORE
from esphome.core import CORE, EsphomeError
@@ -44,32 +43,168 @@ def patch_structhash():
def patch_file_downloader():
"""Patch PlatformIO's FileDownloader to retry on PackageException errors."""
from platformio.package.download import FileDownloader
from platformio.package.exception import PackageException
"""Patch PlatformIO's FileDownloader to add caching and retry on PackageException errors.
original_init = FileDownloader.__init__
This function attempts to patch PlatformIO's internal download mechanism.
If patching fails (due to API changes), it gracefully falls back to no caching.
"""
try:
from platformio.package.download import FileDownloader
from platformio.package.exception import PackageException
except ImportError as e:
_LOGGER.debug("Could not import PlatformIO modules for patching: %s", e)
return
def patched_init(self, *args: Any, **kwargs: Any) -> None:
max_retries = 3
# Import our cache module
from esphome.github_cache import GitHubCache
for attempt in range(max_retries):
try:
return original_init(self, *args, **kwargs)
except PackageException as e:
if attempt < max_retries - 1:
_LOGGER.warning(
"Package download failed: %s. Retrying... (attempt %d/%d)",
str(e),
attempt + 1,
max_retries,
_LOGGER.debug("Applying GitHub download cache patch...")
# Verify the classes have the expected methods before patching
if not hasattr(FileDownloader, "__init__") or not hasattr(FileDownloader, "start"):
_LOGGER.warning(
"PlatformIO FileDownloader API has changed, skipping cache patch"
)
return
try:
original_init = FileDownloader.__init__
original_start = FileDownloader.start
# Initialize cache in .platformio directory so it benefits from GitHub Actions cache
platformio_dir = Path.home() / ".platformio"
cache_dir = platformio_dir / "esphome_download_cache"
cache_dir_existed = cache_dir.exists()
cache = GitHubCache(cache_dir=cache_dir)
if not cache_dir_existed:
_LOGGER.info("Created GitHub download cache at: %s", cache.cache_dir)
except Exception as e:
_LOGGER.warning("Failed to initialize GitHub download cache: %s", e)
return
def patched_init(self, *args, **kwargs):
"""Patched init that checks cache before making HTTP connection."""
try:
# Extract URL from args (first positional argument)
url = args[0] if args else kwargs.get("url")
dest_dir = args[1] if len(args) > 1 else kwargs.get("dest_dir")
# Debug: Log all downloads
_LOGGER.debug("[GitHub Cache] Download request for: %s", url)
# Store URL for later use (original FileDownloader doesn't store it)
self._esphome_cache_url = url if cache.is_github_url(url) else None
# Check cache for GitHub URLs BEFORE making HTTP request
if self._esphome_cache_url:
_LOGGER.debug("[GitHub Cache] This is a GitHub URL, checking cache...")
self._esphome_use_cache = cache.get_cached_path(url, check_updates=True)
if self._esphome_use_cache:
_LOGGER.info(
"Found %s in cache, will restore instead of downloading",
Path(url.split("?")[0]).name,
)
_LOGGER.debug(
"[GitHub Cache] Found in cache: %s", self._esphome_use_cache
)
else:
# Final attempt - re-raise
raise
return None
_LOGGER.debug(
"[GitHub Cache] Not in cache, will download and cache"
)
else:
self._esphome_use_cache = None
if url and str(url).startswith("http"):
_LOGGER.debug("[GitHub Cache] Not a GitHub URL, skipping cache")
FileDownloader.__init__ = patched_init
# Only make HTTP connection if we don't have cached file
if self._esphome_use_cache:
# Skip HTTP connection, we'll handle this in start()
# Set minimal attributes to satisfy FileDownloader
# Create a mock session that can be safely closed in __del__
class MockSession:
def close(self):
pass
self._http_session = MockSession()
self._http_response = None
self._fname = Path(url.split("?")[0]).name
self._destination = self._fname
if dest_dir:
from os.path import join
self._destination = join(dest_dir, self._fname)
# Note: Actual restoration logged in patched_start
return None # Don't call original_init
# Normal initialization with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
return original_init(self, *args, **kwargs)
except PackageException as e:
if attempt < max_retries - 1:
_LOGGER.warning(
"Package download failed: %s. Retrying... (attempt %d/%d)",
str(e),
attempt + 1,
max_retries,
)
else:
# Final attempt - re-raise
raise
return None
except Exception as e:
# If anything goes wrong in our cache logic, fall back to normal download
_LOGGER.debug("Cache check failed, falling back to normal download: %s", e)
self._esphome_cache_url = None
self._esphome_use_cache = None
return original_init(self, *args, **kwargs)
def patched_start(self, *args, **kwargs):
"""Patched start that uses cache when available."""
try:
import shutil
# Get the cache URL and path that were set in __init__
cache_url = getattr(self, "_esphome_cache_url", None)
cached_file = getattr(self, "_esphome_use_cache", None)
# If we're using cache, copy file instead of downloading
if cached_file:
try:
shutil.copy2(cached_file, self._destination)
_LOGGER.info(
"Restored %s from cache (avoided download)",
Path(cached_file).name,
)
return True
except OSError as e:
_LOGGER.warning("Failed to copy from cache: %s", e)
# Fall through to re-download
# Perform normal download
result = original_start(self, *args, **kwargs)
# Save to cache if it was a GitHub URL
if cache_url:
try:
cache.save_to_cache(cache_url, Path(self._destination))
except OSError as e:
_LOGGER.debug("Failed to save to cache: %s", e)
return result
except Exception as e:
# If anything goes wrong, fall back to normal download
_LOGGER.debug("Cache restoration failed, using normal download: %s", e)
return original_start(self, *args, **kwargs)
# Apply the patches
try:
FileDownloader.__init__ = patched_init
FileDownloader.start = patched_start
_LOGGER.debug("GitHub download cache patch applied successfully")
except Exception as e:
_LOGGER.warning("Failed to apply GitHub download cache patch: %s", e)
IGNORE_LIB_WARNINGS = f"(?:{'|'.join(['Hash', 'Update'])})"
@@ -87,6 +222,8 @@ FILTER_PLATFORMIO_LINES = [
r"Memory Usage -> https://bit.ly/pio-memory-usage",
r"Found: https://platformio.org/lib/show/.*",
r"Using cache: .*",
# Don't filter our cache messages - let users see when cache is being used
# r"Using cached download for .*",
r"Installing dependencies",
r"Library Manager: Already installed, built-in library",
r"Building in .* mode",

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
Pre-cache PlatformIO GitHub Downloads
This script extracts GitHub URLs from platformio.ini and pre-caches them
to avoid redundant downloads when switching between ESP8266 and ESP32 builds.
Usage:
python3 script/cache_platformio_downloads.py [platformio.ini]
"""
import argparse
import configparser
from pathlib import Path
import re
import sys
# Import the cache manager
sys.path.insert(0, str(Path(__file__).parent.parent))
from esphome.github_cache import GitHubCache
def extract_github_urls(platformio_ini: Path) -> list[str]:
"""Extract all GitHub URLs from platformio.ini.
Args:
platformio_ini: Path to platformio.ini file
Returns:
List of GitHub URLs found
"""
config = configparser.ConfigParser(inline_comment_prefixes=(";",))
config.read(platformio_ini)
urls = []
github_pattern = re.compile(r"https://github\.com/[^\s;]+\.zip")
for section in config.sections():
conf = config[section]
# Check platform
if "platform" in conf:
platform_value = conf["platform"]
matches = github_pattern.findall(platform_value)
urls.extend(matches)
# Check platform_packages
if "platform_packages" in conf:
for line in conf["platform_packages"].splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
matches = github_pattern.findall(line)
urls.extend(matches)
# Remove duplicates while preserving order using dict
return list(dict.fromkeys(urls))
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Pre-cache PlatformIO GitHub downloads",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
This script scans platformio.ini for GitHub URLs and pre-caches them.
This avoids redundant downloads when switching between platforms (e.g., ESP8266 and ESP32).
Examples:
# Cache downloads from default platformio.ini
%(prog)s
# Cache downloads from specific file
%(prog)s custom_platformio.ini
# Show what would be cached without downloading
%(prog)s --dry-run
""",
)
parser.add_argument(
"platformio_ini",
nargs="?",
default="platformio.ini",
help="Path to platformio.ini (default: platformio.ini)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be cached without downloading",
)
parser.add_argument(
"--cache-dir",
type=Path,
help="Cache directory (default: ~/.platformio/esphome_download_cache)",
)
parser.add_argument(
"--force",
action="store_true",
help="Force re-download even if cached",
)
args = parser.parse_args()
platformio_ini = Path(args.platformio_ini)
if not platformio_ini.exists():
print(f"Error: {platformio_ini} not found", file=sys.stderr)
return 1
# Extract URLs
print(f"Scanning {platformio_ini} for GitHub URLs...")
urls = extract_github_urls(platformio_ini)
if not urls:
print("No GitHub URLs found in platformio.ini")
return 0
print(f"Found {len(urls)} unique GitHub URL(s):")
for url in urls:
print(f" - {url}")
print()
if args.dry_run:
print("Dry run - not downloading")
return 0
# Initialize cache (use PlatformIO directory by default)
cache_dir = args.cache_dir
if cache_dir is None:
cache_dir = Path.home() / ".platformio" / "esphome_download_cache"
cache = GitHubCache(cache_dir)
# Cache each URL
success_count = 0
for i, url in enumerate(urls, 1):
print(f"[{i}/{len(urls)}] Checking {url}")
try:
# Use the download_with_progress from github_download_cache CLI
from script.github_download_cache import download_with_progress
download_with_progress(cache, url, force=args.force, check_updates=True)
success_count += 1
print()
except Exception as e:
print(f"Error caching {url}: {e}", file=sys.stderr)
print()
# Show cache stats
total_size = cache.cache_size()
size_mb = total_size / (1024 * 1024)
print("\nCache summary:")
print(f" Successfully cached: {success_count}/{len(urls)}")
print(f" Total cache size: {size_mb:.2f} MB")
print(f" Cache location: {cache.cache_dir}")
return 0 if success_count == len(urls) else 1
if __name__ == "__main__":
sys.exit(main())

195
script/github_download_cache.py Executable file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
GitHub Download Cache CLI
This script provides a command-line interface to the GitHub download cache.
The actual caching logic is in esphome/github_cache.py.
Usage:
python3 script/github_download_cache.py download URL
python3 script/github_download_cache.py list
python3 script/github_download_cache.py stats
python3 script/github_download_cache.py clear
"""
import argparse
from pathlib import Path
import sys
import urllib.request
# Add parent directory to path to import esphome modules
sys.path.insert(0, str(Path(__file__).parent.parent))
from esphome.github_cache import GitHubCache
def download_with_progress(
cache: GitHubCache, url: str, force: bool = False, check_updates: bool = True
) -> Path:
"""Download a URL with progress indicator and caching.
Args:
cache: GitHubCache instance
url: URL to download
force: Force re-download even if cached
check_updates: Check for updates using HTTP 304
Returns:
Path to cached file
"""
# If force, skip cache check
if not force:
cached_path = cache.get_cached_path(url, check_updates=check_updates)
if cached_path:
print(f"Using cached file for {url}")
print(f" Cache: {cached_path}")
return cached_path
# Need to download
print(f"Downloading {url}")
cache_path = cache._get_cache_path(url)
print(f" Cache: {cache_path}")
# Download with progress
temp_path = cache_path.with_suffix(cache_path.suffix + ".tmp")
try:
with urllib.request.urlopen(url) as response:
total_size = int(response.headers.get("Content-Length", 0))
downloaded = 0
with open(temp_path, "wb") as f:
while True:
chunk = response.read(8192)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"\r Progress: {percent:.1f}%", end="", flush=True)
print() # New line after progress
# Move to final location
temp_path.replace(cache_path)
# Let cache handle metadata
cache.save_to_cache(url, cache_path)
return cache_path
except (OSError, urllib.error.URLError) as e:
if temp_path.exists():
temp_path.unlink()
raise RuntimeError(f"Failed to download {url}: {e}") from e
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description="GitHub Download Cache Manager",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Download and cache a URL
%(prog)s download https://github.com/pioarduino/registry/releases/download/0.0.1/esptoolpy-v5.1.0.zip
# List cached files
%(prog)s list
# Show cache statistics
%(prog)s stats
# Clear cache
%(prog)s clear
""",
)
parser.add_argument(
"--cache-dir",
type=Path,
help="Cache directory (default: ~/.platformio/esphome_download_cache)",
)
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
# Download command
download_parser = subparsers.add_parser("download", help="Download and cache a URL")
download_parser.add_argument("url", help="URL to download")
download_parser.add_argument(
"--force", action="store_true", help="Force re-download even if cached"
)
download_parser.add_argument(
"--no-check-updates",
action="store_true",
help="Skip checking for updates (don't use HTTP 304)",
)
# List command
subparsers.add_parser("list", help="List cached files")
# Stats command
subparsers.add_parser("stats", help="Show cache statistics")
# Clear command
subparsers.add_parser("clear", help="Clear all cached files")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Use PlatformIO cache directory by default
if args.cache_dir is None:
args.cache_dir = Path.home() / ".platformio" / "esphome_download_cache"
cache = GitHubCache(args.cache_dir)
if args.command == "download":
try:
check_updates = not args.no_check_updates
cache_path = download_with_progress(
cache, args.url, force=args.force, check_updates=check_updates
)
print(f"\nCached at: {cache_path}")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
elif args.command == "list":
cached = cache.list_cached()
if not cached:
print("No cached files")
return 0
print(f"Cached files ({len(cached)}):")
for item in cached:
size_mb = item["size"] / (1024 * 1024)
print(f" {item['url']}")
print(f" Size: {size_mb:.2f} MB")
print(f" Path: {item['path']}")
return 0
elif args.command == "stats":
total_size = cache.cache_size()
cached_count = len(cache.list_cached())
size_mb = total_size / (1024 * 1024)
print(f"Cache directory: {cache.cache_dir}")
print(f"Cached files: {cached_count}")
print(f"Total size: {size_mb:.2f} MB")
return 0
elif args.command == "clear":
cache.clear_cache()
return 0
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
PlatformIO Download Wrapper with Caching
This script can be used as a wrapper around PlatformIO downloads to add caching.
It intercepts download operations and uses the GitHub download cache.
This is designed to be called from PlatformIO's extra_scripts if needed.
"""
from pathlib import Path
import sys
# Import the cache manager
sys.path.insert(0, str(Path(__file__).parent))
from github_download_cache import GitHubDownloadCache
def is_github_url(url: str) -> bool:
"""Check if a URL is a GitHub URL."""
return "github.com" in url.lower()
def cached_download_handler(source, target, env):
"""PlatformIO download handler that uses caching for GitHub URLs.
This function can be registered as a custom download handler in PlatformIO.
Args:
source: Source URL
target: Target file path
env: SCons environment
"""
import shutil
import urllib.request
url = str(source[0])
target_path = Path(str(target[0]))
# Only cache GitHub URLs
if not is_github_url(url):
# Fall back to default download
print(f"Downloading (no cache): {url}")
with (
urllib.request.urlopen(url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
return
# Use cache for GitHub URLs
cache = GitHubDownloadCache()
print(f"Downloading with cache: {url}")
try:
cached_path = cache.download_with_cache(url, check_updates=True)
# Copy from cache to target
shutil.copy2(cached_path, target_path)
print(f" Copied to: {target_path}")
except Exception as e:
print(f"Cache download failed, using direct download: {e}")
# Fall back to direct download
with (
urllib.request.urlopen(url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
def setup_platformio_caching():
"""Setup PlatformIO to use cached downloads.
This should be called from an extra_scripts file in platformio.ini.
Example extra_scripts file (e.g., platformio_hooks.py):
Import("env")
from script.platformio_download_wrapper import setup_platformio_caching
setup_platformio_caching()
"""
try:
from SCons.Script import DefaultEnvironment
DefaultEnvironment()
# Register custom download handler
# Note: This may not work with all PlatformIO versions
# as the download mechanism is internal
print("Note: Direct download interception is not fully supported.")
print("Please use the cache_platformio_downloads.py script instead.")
except ImportError:
print("Warning: SCons not available, cannot setup download caching")
if __name__ == "__main__":
# CLI mode - can be used to manually download a URL with caching
import argparse
parser = argparse.ArgumentParser(description="Download a URL with caching")
parser.add_argument("url", help="URL to download")
parser.add_argument("target", help="Target file path")
parser.add_argument("--cache-dir", type=Path, help="Cache directory")
args = parser.parse_args()
cache = GitHubDownloadCache(args.cache_dir)
target_path = Path(args.target)
try:
if is_github_url(args.url):
print(f"Downloading with cache: {args.url}")
cached_path = cache.download_with_cache(args.url)
# Copy to target
import shutil
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(cached_path, target_path)
print(f"Copied to: {target_path}")
else:
print(f"Downloading directly (not a GitHub URL): {args.url}")
import shutil
import urllib.request
target_path.parent.mkdir(parents=True, exist_ok=True)
with (
urllib.request.urlopen(args.url) as response,
open(target_path, "wb") as out_file,
):
shutil.copyfileobj(response, out_file)
sys.exit(0)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)

View File

@@ -8,7 +8,6 @@ substitutions:
area: 25
numberOne: 1
var1: 79
double_width: 14
test_list:
- The area is 56
- 56
@@ -26,4 +25,3 @@ test_list:
- ord("a") = 97
- chr(97) = a
- len([1,2,3]) = 3
- width = 7, double_width = 14

View File

@@ -8,7 +8,6 @@ substitutions:
area: 25
numberOne: 1
var1: 79
double_width: ${width * 2}
test_list:
- "The area is ${width * height}"
@@ -24,4 +23,3 @@ test_list:
- ord("a") = ${ ord("a") }
- chr(97) = ${ chr(97) }
- len([1,2,3]) = ${ len([1,2,3]) }
- width = ${width}, double_width = ${double_width}

View File

@@ -570,13 +570,6 @@ class TestEsphomeCore:
assert target.address == "4.3.2.1"
def test_address__openthread(self, target):
target.name = "test-device"
target.config = {}
target.config[const.CONF_OPENTHREAD] = {}
assert target.address == "test-device.local"
def test_is_esp32(self, target):
target.data[const.KEY_CORE] = {const.KEY_TARGET_PLATFORM: "esp32"}

View File

@@ -17,12 +17,10 @@ from esphome import platformio_api
from esphome.__main__ import (
Purpose,
choose_upload_log_host,
command_analyze_memory,
command_clean_all,
command_rename,
command_update_all,
command_wizard,
detect_external_components,
get_port_type,
has_ip_address,
has_mqtt,
@@ -228,47 +226,13 @@ def mock_run_external_process() -> Generator[Mock]:
@pytest.fixture
def mock_run_external_command_main() -> Generator[Mock]:
"""Mock run_external_command in __main__ module (different from platformio_api)."""
def mock_run_external_command() -> Generator[Mock]:
"""Mock run_external_command for testing."""
with patch("esphome.__main__.run_external_command") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_write_cpp() -> Generator[Mock]:
"""Mock write_cpp for testing."""
with patch("esphome.__main__.write_cpp") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_compile_program() -> Generator[Mock]:
"""Mock compile_program for testing."""
with patch("esphome.__main__.compile_program") as mock:
mock.return_value = 0 # Default to success
yield mock
@pytest.fixture
def mock_get_esphome_components() -> Generator[Mock]:
"""Mock get_esphome_components for testing."""
with patch("esphome.analyze_memory.helpers.get_esphome_components") as mock:
mock.return_value = {"logger", "api", "ota"}
yield mock
@pytest.fixture
def mock_memory_analyzer_cli() -> Generator[Mock]:
"""Mock MemoryAnalyzerCLI for testing."""
with patch("esphome.analyze_memory.cli.MemoryAnalyzerCLI") as mock_class:
mock_analyzer = MagicMock()
mock_analyzer.generate_report.return_value = "Mock Memory Report"
mock_class.return_value = mock_analyzer
yield mock_class
def test_choose_upload_log_host_with_string_default() -> None:
"""Test with a single string default device."""
setup_core()
@@ -875,7 +839,7 @@ def test_upload_program_serial_esp8266_with_file(
def test_upload_using_esptool_path_conversion(
tmp_path: Path,
mock_run_external_command_main: Mock,
mock_run_external_command: Mock,
mock_get_idedata: Mock,
) -> None:
"""Test upload_using_esptool properly converts Path objects to strings for esptool.
@@ -911,10 +875,10 @@ def test_upload_using_esptool_path_conversion(
assert result == 0
# Verify that run_external_command was called
assert mock_run_external_command_main.call_count == 1
assert mock_run_external_command.call_count == 1
# Get the actual call arguments
call_args = mock_run_external_command_main.call_args[0]
call_args = mock_run_external_command.call_args[0]
# The first argument should be esptool.main function,
# followed by the command arguments
@@ -953,7 +917,7 @@ def test_upload_using_esptool_path_conversion(
def test_upload_using_esptool_with_file_path(
tmp_path: Path,
mock_run_external_command_main: Mock,
mock_run_external_command: Mock,
) -> None:
"""Test upload_using_esptool with a custom file that's a Path object."""
setup_core(platform=PLATFORM_ESP8266, tmp_path=tmp_path, name="test")
@@ -970,10 +934,10 @@ def test_upload_using_esptool_with_file_path(
assert result == 0
# Verify that run_external_command was called
mock_run_external_command_main.assert_called_once()
mock_run_external_command.assert_called_once()
# Get the actual call arguments
call_args = mock_run_external_command_main.call_args[0]
call_args = mock_run_external_command.call_args[0]
cmd_list = list(call_args[1:]) # Skip the esptool.main function
# Find the firmware path in the command
@@ -2309,226 +2273,3 @@ def test_show_logs_api_mqtt_timeout_fallback(
# Verify run_logs was called with only the static IP (MQTT failed)
mock_run_logs.assert_called_once_with(CORE.config, ["192.168.1.100"])
def test_detect_external_components_no_external(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components with no external components."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
"api": {},
}
result = detect_external_components(config)
assert result == set()
mock_get_esphome_components.assert_called_once()
def test_detect_external_components_with_external(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components detects external components."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {}, # Built-in
"api": {}, # Built-in
"my_custom_sensor": {}, # External
"another_custom": {}, # External
"external_components": [], # Special key, not a component
"substitutions": {}, # Special key, not a component
}
result = detect_external_components(config)
assert result == {"my_custom_sensor", "another_custom"}
mock_get_esphome_components.assert_called_once()
def test_detect_external_components_filters_special_keys(
mock_get_esphome_components: Mock,
) -> None:
"""Test detect_external_components filters out special config keys."""
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"substitutions": {"key": "value"},
"packages": {},
"globals": [],
"external_components": [],
"<<": {}, # YAML merge key
}
result = detect_external_components(config)
assert result == set()
mock_get_esphome_components.assert_called_once()
def test_command_analyze_memory_success(
tmp_path: Path,
capfd: CaptureFixture[str],
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
mock_get_esphome_components: Mock,
mock_memory_analyzer_cli: Mock,
) -> None:
"""Test command_analyze_memory with successful compilation and analysis."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
# Create firmware.elf file
firmware_path = (
tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device"
)
firmware_path.mkdir(parents=True, exist_ok=True)
firmware_elf = firmware_path / "firmware.elf"
firmware_elf.write_text("mock elf file")
# Mock idedata
mock_idedata_obj = MagicMock(spec=platformio_api.IDEData)
mock_idedata_obj.firmware_elf_path = str(firmware_elf)
mock_idedata_obj.objdump_path = "/path/to/objdump"
mock_idedata_obj.readelf_path = "/path/to/readelf"
mock_get_idedata.return_value = mock_idedata_obj
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
}
args = MockArgs()
result = command_analyze_memory(args, config)
assert result == 0
# Verify compilation was done
mock_write_cpp.assert_called_once_with(config)
mock_compile_program.assert_called_once_with(args, config)
# Verify analyzer was created with correct parameters
mock_memory_analyzer_cli.assert_called_once_with(
str(firmware_elf),
"/path/to/objdump",
"/path/to/readelf",
set(), # No external components
)
# Verify analysis was run
mock_analyzer = mock_memory_analyzer_cli.return_value
mock_analyzer.analyze.assert_called_once()
mock_analyzer.generate_report.assert_called_once()
# Verify report was printed
captured = capfd.readouterr()
assert "Mock Memory Report" in captured.out
def test_command_analyze_memory_with_external_components(
tmp_path: Path,
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
mock_get_esphome_components: Mock,
mock_memory_analyzer_cli: Mock,
) -> None:
"""Test command_analyze_memory detects external components."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
# Create firmware.elf file
firmware_path = (
tmp_path / ".esphome" / "build" / "test_device" / ".pioenvs" / "test_device"
)
firmware_path.mkdir(parents=True, exist_ok=True)
firmware_elf = firmware_path / "firmware.elf"
firmware_elf.write_text("mock elf file")
# Mock idedata
mock_idedata_obj = MagicMock(spec=platformio_api.IDEData)
mock_idedata_obj.firmware_elf_path = str(firmware_elf)
mock_idedata_obj.objdump_path = "/path/to/objdump"
mock_idedata_obj.readelf_path = "/path/to/readelf"
mock_get_idedata.return_value = mock_idedata_obj
config = {
CONF_ESPHOME: {CONF_NAME: "test_device"},
"logger": {},
"my_custom_component": {"param": "value"}, # External component
"external_components": [{"source": "github://user/repo"}], # Not a component
}
args = MockArgs()
result = command_analyze_memory(args, config)
assert result == 0
# Verify analyzer was created with external components detected
mock_memory_analyzer_cli.assert_called_once_with(
str(firmware_elf),
"/path/to/objdump",
"/path/to/readelf",
{"my_custom_component"}, # External component detected
)
def test_command_analyze_memory_write_cpp_fails(
tmp_path: Path,
mock_write_cpp: Mock,
) -> None:
"""Test command_analyze_memory when write_cpp fails."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_write_cpp.return_value = 1 # Failure
result = command_analyze_memory(args, config)
assert result == 1
mock_write_cpp.assert_called_once_with(config)
def test_command_analyze_memory_compile_fails(
tmp_path: Path,
mock_write_cpp: Mock,
mock_compile_program: Mock,
) -> None:
"""Test command_analyze_memory when compilation fails."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_compile_program.return_value = 1 # Compilation failed
result = command_analyze_memory(args, config)
assert result == 1
mock_write_cpp.assert_called_once_with(config)
mock_compile_program.assert_called_once_with(args, config)
def test_command_analyze_memory_no_idedata(
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
mock_write_cpp: Mock,
mock_compile_program: Mock,
mock_get_idedata: Mock,
) -> None:
"""Test command_analyze_memory when idedata cannot be retrieved."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test_device")
config = {CONF_ESPHOME: {CONF_NAME: "test_device"}}
args = MockArgs()
mock_get_idedata.return_value = None # Failed to get idedata
with caplog.at_level(logging.ERROR):
result = command_analyze_memory(args, config)
assert result == 1
assert "Failed to get IDE data for memory analysis" in caplog.text