Improve logging in integration tests when port does not open (#8932)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
J. Nick Koston 2025-05-28 18:42:19 -05:00 committed by GitHub
parent 43e88af28a
commit 70c5e1bbf1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 206 additions and 103 deletions

View File

@ -6,21 +6,22 @@ import asyncio
from collections.abc import AsyncGenerator, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
import logging
import os
from pathlib import Path
import platform
import signal
import socket
import sys
import tempfile
from typing import TextIO
from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic
import pytest
import pytest_asyncio
# Skip all integration tests on Windows
if platform.system() == "Windows":
pytest.skip(
"Integration tests are not supported on Windows", allow_module_level=True
)
import esphome.config
from esphome.core import CORE
from esphome.platformio_api import get_idedata
from .const import (
API_CONNECTION_TIMEOUT,
@ -37,9 +38,16 @@ from .types import (
CompileFunction,
ConfigWriter,
RunCompiledFunction,
RunFunction,
)
# Skip all integration tests on Windows
if platform.system() == "Windows":
pytest.skip(
"Integration tests are not supported on Windows", allow_module_level=True
)
import pty # not available on Windows
@pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging():
@ -134,75 +142,72 @@ async def write_yaml_config(
yield _write_config
async def _run_esphome_command(
command: str,
config_path: Path,
cwd: Path,
) -> asyncio.subprocess.Process:
"""Run an ESPHome command with the given arguments."""
return await asyncio.create_subprocess_exec(
"esphome",
command,
str(config_path),
cwd=cwd,
stdout=None, # Inherit stdout
stderr=None, # Inherit stderr
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
)
@pytest_asyncio.fixture
async def compile_esphome(
integration_test_dir: Path,
) -> AsyncGenerator[CompileFunction]:
"""Compile an ESPHome configuration."""
"""Compile an ESPHome configuration and return the binary path."""
async def _compile(config_path: Path) -> None:
proc = await _run_esphome_command("compile", config_path, integration_test_dir)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError(
f"Failed to compile {config_path}, return code: {proc.returncode}. "
f"Run with 'pytest -s' to see compilation output."
async def _compile(config_path: Path) -> Path:
# Retry compilation up to 3 times if we get a segfault
max_retries = 3
for attempt in range(max_retries):
# Compile using subprocess, inheriting stdout/stderr to show progress
proc = await asyncio.create_subprocess_exec(
"esphome",
"compile",
str(config_path),
cwd=integration_test_dir,
stdout=None, # Inherit stdout
stderr=None, # Inherit stderr
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
)
await proc.wait()
if proc.returncode == 0:
# Success!
break
elif proc.returncode == -11 and attempt < max_retries - 1:
# Segfault (-11 = SIGSEGV), retry
print(
f"Compilation segfaulted (attempt {attempt + 1}/{max_retries}), retrying..."
)
await asyncio.sleep(1) # Brief pause before retry
continue
else:
# Other error or final retry
raise RuntimeError(
f"Failed to compile {config_path}, return code: {proc.returncode}. "
f"Run with 'pytest -s' to see compilation output."
)
# Load the config to get idedata (blocking call, must use executor)
loop = asyncio.get_running_loop()
def _read_config_and_get_binary():
CORE.config_path = str(config_path)
config = esphome.config.read_config(
{"command": "compile", "config": str(config_path)}
)
if config is None:
raise RuntimeError(f"Failed to read config from {config_path}")
# Get the compiled binary path
idedata = get_idedata(config)
return Path(idedata.firmware_elf_path)
binary_path = await loop.run_in_executor(None, _read_config_and_get_binary)
if not binary_path.exists():
raise RuntimeError(f"Compiled binary not found at {binary_path}")
return binary_path
yield _compile
@pytest_asyncio.fixture
async def run_esphome_process(
integration_test_dir: Path,
) -> AsyncGenerator[RunFunction]:
"""Run an ESPHome process and manage its lifecycle."""
processes: list[asyncio.subprocess.Process] = []
async def _run(config_path: Path) -> asyncio.subprocess.Process:
process = await _run_esphome_command("run", config_path, integration_test_dir)
processes.append(process)
return process
yield _run
# Cleanup: terminate all "run" processes gracefully
for process in processes:
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown of the running ESPHome instance
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except asyncio.TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except asyncio.TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
async def create_api_client(
address: str = LOCALHOST,
@ -341,28 +346,140 @@ async def api_client_connected(
yield _connect_client
async def wait_for_port_open(
host: str, port: int, timeout: float = PORT_WAIT_TIMEOUT
async def _read_stream_lines(
stream: asyncio.StreamReader, lines: list[str], output_stream: TextIO
) -> None:
"""Wait for a TCP port to be open and accepting connections."""
"""Read lines from a stream, append to list, and echo to output stream."""
while line := await stream.readline():
decoded_line = line.decode("utf-8", errors="replace")
lines.append(decoded_line.rstrip())
# Echo to stdout/stderr in real-time
print(decoded_line.rstrip(), file=output_stream, flush=True)
@asynccontextmanager
async def run_binary_and_wait_for_port(
binary_path: Path,
host: str,
port: int,
timeout: float = PORT_WAIT_TIMEOUT,
) -> AsyncGenerator[None]:
"""Run a binary, wait for it to open a port, and clean up on exit."""
# Create a pseudo-terminal to make the binary think it's running interactively
# This is needed because the ESPHome host logger checks isatty()
controller_fd, device_fd = pty.openpty()
# Run the compiled binary with PTY
process = await asyncio.create_subprocess_exec(
str(binary_path),
stdout=device_fd,
stderr=device_fd,
stdin=asyncio.subprocess.DEVNULL,
# Start in a new process group to isolate signal handling
start_new_session=True,
pass_fds=(device_fd,),
)
# Close the device end in the parent process
os.close(device_fd)
# Convert controller_fd to async streams for reading
loop = asyncio.get_running_loop()
controller_reader = asyncio.StreamReader()
controller_protocol = asyncio.StreamReaderProtocol(controller_reader)
controller_transport, _ = await loop.connect_read_pipe(
lambda: controller_protocol, os.fdopen(controller_fd, "rb", 0)
)
output_reader = controller_reader
if process.returncode is not None:
raise RuntimeError(
f"Process died immediately with return code {process.returncode}. "
"Ensure the binary is valid and can run successfully."
)
# Wait for the API server to start listening
loop = asyncio.get_running_loop()
start_time = loop.time()
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
# Start collecting output
stdout_lines: list[str] = []
output_tasks: list[asyncio.Task] = []
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
_, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
return # Port is open
except (ConnectionRefusedError, OSError):
# Port not open yet, wait a bit and try again
await asyncio.sleep(PORT_POLL_INTERVAL)
try:
# Read from output stream
output_tasks = [
asyncio.create_task(
_read_stream_lines(output_reader, stdout_lines, sys.stdout)
)
]
raise TimeoutError(f"Port {port} on {host} did not open within {timeout} seconds")
# Small yield to ensure the process has a chance to start
await asyncio.sleep(0)
while loop.time() - start_time < timeout:
try:
# Try to connect to the port
_, writer = await asyncio.open_connection(host, port)
writer.close()
await writer.wait_closed()
# Port is open, yield control
yield
return
except (ConnectionRefusedError, OSError):
# Check if process died
if process.returncode is not None:
break
# Port not open yet, wait a bit and try again
await asyncio.sleep(PORT_POLL_INTERVAL)
# Timeout or process died - build error message
error_msg = f"Port {port} on {host} did not open within {timeout} seconds"
if process.returncode is not None:
error_msg += f"\nProcess exited with code: {process.returncode}"
# Include any output collected so far
if stdout_lines:
error_msg += "\n\n--- Process Output ---\n"
error_msg += "\n".join(stdout_lines[-100:]) # Last 100 lines
raise TimeoutError(error_msg)
finally:
# Cancel output collection tasks
for task in output_tasks:
task.cancel()
# Wait for tasks to complete and check for exceptions
results = await asyncio.gather(*output_tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception) and not isinstance(
result, asyncio.CancelledError
):
print(
f"Error reading from PTY: {result}",
file=sys.stderr,
)
# Close the PTY transport (Unix only)
if controller_transport is not None:
controller_transport.close()
# Cleanup: terminate the process gracefully
if process.returncode is None:
# Send SIGINT (Ctrl+C) for graceful shutdown
process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT)
except asyncio.TimeoutError:
# If SIGINT didn't work, try SIGTERM
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT)
except asyncio.TimeoutError:
# Last resort: SIGKILL
process.kill()
await process.wait()
@asynccontextmanager
@ -371,40 +488,29 @@ async def run_compiled_context(
filename: str | None,
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
run_esphome_process: RunFunction,
port: int,
port_socket: socket.socket | None = None,
) -> AsyncGenerator[asyncio.subprocess.Process]:
) -> AsyncGenerator[None]:
"""Context manager to write, compile and run an ESPHome configuration."""
# Write the YAML config
config_path = await write_yaml_config(yaml_content, filename)
# Compile the configuration
await compile_esphome(config_path)
# Compile the configuration and get binary path
binary_path = await compile_esphome(config_path)
# Close the port socket right before running to release the port
if port_socket is not None:
port_socket.close()
# Run the ESPHome device
process = await run_esphome_process(config_path)
assert process.returncode is None, "Process died immediately"
# Wait for the API server to start listening
await wait_for_port_open(LOCALHOST, port, timeout=PORT_WAIT_TIMEOUT)
try:
yield process
finally:
# Process cleanup is handled by run_esphome_process fixture
pass
# Run the binary and wait for the API server to start
async with run_binary_and_wait_for_port(binary_path, LOCALHOST, port):
yield
@pytest_asyncio.fixture
async def run_compiled(
write_yaml_config: ConfigWriter,
compile_esphome: CompileFunction,
run_esphome_process: RunFunction,
reserved_tcp_port: tuple[int, socket.socket],
) -> AsyncGenerator[RunCompiledFunction]:
"""Write, compile and run an ESPHome configuration."""
@ -418,7 +524,6 @@ async def run_compiled(
filename,
write_yaml_config,
compile_esphome,
run_esphome_process,
port,
port_socket,
)

View File

@ -11,11 +11,9 @@ from typing import Protocol
from aioesphomeapi import APIClient
ConfigWriter = Callable[[str, str | None], Awaitable[Path]]
CompileFunction = Callable[[Path], Awaitable[None]]
CompileFunction = Callable[[Path], Awaitable[Path]]
RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]]
RunCompiledFunction = Callable[
[str, str | None], AbstractAsyncContextManager[asyncio.subprocess.Process]
]
RunCompiledFunction = Callable[[str, str | None], AbstractAsyncContextManager[None]]
WaitFunction = Callable[[APIClient, float], Awaitable[bool]]