diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 35586519e4..4c798c6b72 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -6,21 +6,22 @@ import asyncio from collections.abc import AsyncGenerator, Generator from contextlib import AbstractAsyncContextManager, asynccontextmanager import logging +import os from pathlib import Path import platform import signal import socket +import sys import tempfile +from typing import TextIO from aioesphomeapi import APIClient, APIConnectionError, ReconnectLogic import pytest import pytest_asyncio -# Skip all integration tests on Windows -if platform.system() == "Windows": - pytest.skip( - "Integration tests are not supported on Windows", allow_module_level=True - ) +import esphome.config +from esphome.core import CORE +from esphome.platformio_api import get_idedata from .const import ( API_CONNECTION_TIMEOUT, @@ -37,9 +38,16 @@ from .types import ( CompileFunction, ConfigWriter, RunCompiledFunction, - RunFunction, ) +# Skip all integration tests on Windows +if platform.system() == "Windows": + pytest.skip( + "Integration tests are not supported on Windows", allow_module_level=True + ) + +import pty # not available on Windows + @pytest.fixture(scope="module", autouse=True) def enable_aioesphomeapi_debug_logging(): @@ -134,75 +142,72 @@ async def write_yaml_config( yield _write_config -async def _run_esphome_command( - command: str, - config_path: Path, - cwd: Path, -) -> asyncio.subprocess.Process: - """Run an ESPHome command with the given arguments.""" - return await asyncio.create_subprocess_exec( - "esphome", - command, - str(config_path), - cwd=cwd, - stdout=None, # Inherit stdout - stderr=None, # Inherit stderr - stdin=asyncio.subprocess.DEVNULL, - # Start in a new process group to isolate signal handling - start_new_session=True, - ) - - @pytest_asyncio.fixture async def compile_esphome( integration_test_dir: Path, ) -> AsyncGenerator[CompileFunction]: - """Compile an ESPHome configuration.""" + """Compile an ESPHome configuration and return the binary path.""" - async def _compile(config_path: Path) -> None: - proc = await _run_esphome_command("compile", config_path, integration_test_dir) - await proc.wait() - if proc.returncode != 0: - raise RuntimeError( - f"Failed to compile {config_path}, return code: {proc.returncode}. " - f"Run with 'pytest -s' to see compilation output." + async def _compile(config_path: Path) -> Path: + # Retry compilation up to 3 times if we get a segfault + max_retries = 3 + for attempt in range(max_retries): + # Compile using subprocess, inheriting stdout/stderr to show progress + proc = await asyncio.create_subprocess_exec( + "esphome", + "compile", + str(config_path), + cwd=integration_test_dir, + stdout=None, # Inherit stdout + stderr=None, # Inherit stderr + stdin=asyncio.subprocess.DEVNULL, + # Start in a new process group to isolate signal handling + start_new_session=True, ) + await proc.wait() + + if proc.returncode == 0: + # Success! + break + elif proc.returncode == -11 and attempt < max_retries - 1: + # Segfault (-11 = SIGSEGV), retry + print( + f"Compilation segfaulted (attempt {attempt + 1}/{max_retries}), retrying..." + ) + await asyncio.sleep(1) # Brief pause before retry + continue + else: + # Other error or final retry + raise RuntimeError( + f"Failed to compile {config_path}, return code: {proc.returncode}. " + f"Run with 'pytest -s' to see compilation output." + ) + + # Load the config to get idedata (blocking call, must use executor) + loop = asyncio.get_running_loop() + + def _read_config_and_get_binary(): + CORE.config_path = str(config_path) + config = esphome.config.read_config( + {"command": "compile", "config": str(config_path)} + ) + if config is None: + raise RuntimeError(f"Failed to read config from {config_path}") + + # Get the compiled binary path + idedata = get_idedata(config) + return Path(idedata.firmware_elf_path) + + binary_path = await loop.run_in_executor(None, _read_config_and_get_binary) + + if not binary_path.exists(): + raise RuntimeError(f"Compiled binary not found at {binary_path}") + + return binary_path yield _compile -@pytest_asyncio.fixture -async def run_esphome_process( - integration_test_dir: Path, -) -> AsyncGenerator[RunFunction]: - """Run an ESPHome process and manage its lifecycle.""" - processes: list[asyncio.subprocess.Process] = [] - - async def _run(config_path: Path) -> asyncio.subprocess.Process: - process = await _run_esphome_command("run", config_path, integration_test_dir) - processes.append(process) - return process - - yield _run - - # Cleanup: terminate all "run" processes gracefully - for process in processes: - if process.returncode is None: - # Send SIGINT (Ctrl+C) for graceful shutdown of the running ESPHome instance - process.send_signal(signal.SIGINT) - try: - await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT) - except asyncio.TimeoutError: - # If SIGINT didn't work, try SIGTERM - process.terminate() - try: - await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT) - except asyncio.TimeoutError: - # Last resort: SIGKILL - process.kill() - await process.wait() - - @asynccontextmanager async def create_api_client( address: str = LOCALHOST, @@ -341,28 +346,140 @@ async def api_client_connected( yield _connect_client -async def wait_for_port_open( - host: str, port: int, timeout: float = PORT_WAIT_TIMEOUT +async def _read_stream_lines( + stream: asyncio.StreamReader, lines: list[str], output_stream: TextIO ) -> None: - """Wait for a TCP port to be open and accepting connections.""" + """Read lines from a stream, append to list, and echo to output stream.""" + while line := await stream.readline(): + decoded_line = line.decode("utf-8", errors="replace") + lines.append(decoded_line.rstrip()) + # Echo to stdout/stderr in real-time + print(decoded_line.rstrip(), file=output_stream, flush=True) + + +@asynccontextmanager +async def run_binary_and_wait_for_port( + binary_path: Path, + host: str, + port: int, + timeout: float = PORT_WAIT_TIMEOUT, +) -> AsyncGenerator[None]: + """Run a binary, wait for it to open a port, and clean up on exit.""" + # Create a pseudo-terminal to make the binary think it's running interactively + # This is needed because the ESPHome host logger checks isatty() + controller_fd, device_fd = pty.openpty() + + # Run the compiled binary with PTY + process = await asyncio.create_subprocess_exec( + str(binary_path), + stdout=device_fd, + stderr=device_fd, + stdin=asyncio.subprocess.DEVNULL, + # Start in a new process group to isolate signal handling + start_new_session=True, + pass_fds=(device_fd,), + ) + + # Close the device end in the parent process + os.close(device_fd) + + # Convert controller_fd to async streams for reading + loop = asyncio.get_running_loop() + controller_reader = asyncio.StreamReader() + controller_protocol = asyncio.StreamReaderProtocol(controller_reader) + controller_transport, _ = await loop.connect_read_pipe( + lambda: controller_protocol, os.fdopen(controller_fd, "rb", 0) + ) + output_reader = controller_reader + + if process.returncode is not None: + raise RuntimeError( + f"Process died immediately with return code {process.returncode}. " + "Ensure the binary is valid and can run successfully." + ) + + # Wait for the API server to start listening loop = asyncio.get_running_loop() start_time = loop.time() - # Small yield to ensure the process has a chance to start - await asyncio.sleep(0) + # Start collecting output + stdout_lines: list[str] = [] + output_tasks: list[asyncio.Task] = [] - while loop.time() - start_time < timeout: - try: - # Try to connect to the port - _, writer = await asyncio.open_connection(host, port) - writer.close() - await writer.wait_closed() - return # Port is open - except (ConnectionRefusedError, OSError): - # Port not open yet, wait a bit and try again - await asyncio.sleep(PORT_POLL_INTERVAL) + try: + # Read from output stream + output_tasks = [ + asyncio.create_task( + _read_stream_lines(output_reader, stdout_lines, sys.stdout) + ) + ] - raise TimeoutError(f"Port {port} on {host} did not open within {timeout} seconds") + # Small yield to ensure the process has a chance to start + await asyncio.sleep(0) + + while loop.time() - start_time < timeout: + try: + # Try to connect to the port + _, writer = await asyncio.open_connection(host, port) + writer.close() + await writer.wait_closed() + # Port is open, yield control + yield + return + except (ConnectionRefusedError, OSError): + # Check if process died + if process.returncode is not None: + break + # Port not open yet, wait a bit and try again + await asyncio.sleep(PORT_POLL_INTERVAL) + + # Timeout or process died - build error message + error_msg = f"Port {port} on {host} did not open within {timeout} seconds" + + if process.returncode is not None: + error_msg += f"\nProcess exited with code: {process.returncode}" + + # Include any output collected so far + if stdout_lines: + error_msg += "\n\n--- Process Output ---\n" + error_msg += "\n".join(stdout_lines[-100:]) # Last 100 lines + + raise TimeoutError(error_msg) + + finally: + # Cancel output collection tasks + for task in output_tasks: + task.cancel() + # Wait for tasks to complete and check for exceptions + results = await asyncio.gather(*output_tasks, return_exceptions=True) + for i, result in enumerate(results): + if isinstance(result, Exception) and not isinstance( + result, asyncio.CancelledError + ): + print( + f"Error reading from PTY: {result}", + file=sys.stderr, + ) + + # Close the PTY transport (Unix only) + if controller_transport is not None: + controller_transport.close() + + # Cleanup: terminate the process gracefully + if process.returncode is None: + # Send SIGINT (Ctrl+C) for graceful shutdown + process.send_signal(signal.SIGINT) + try: + await asyncio.wait_for(process.wait(), timeout=SIGINT_TIMEOUT) + except asyncio.TimeoutError: + # If SIGINT didn't work, try SIGTERM + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=SIGTERM_TIMEOUT) + except asyncio.TimeoutError: + # Last resort: SIGKILL + process.kill() + await process.wait() @asynccontextmanager @@ -371,40 +488,29 @@ async def run_compiled_context( filename: str | None, write_yaml_config: ConfigWriter, compile_esphome: CompileFunction, - run_esphome_process: RunFunction, port: int, port_socket: socket.socket | None = None, -) -> AsyncGenerator[asyncio.subprocess.Process]: +) -> AsyncGenerator[None]: """Context manager to write, compile and run an ESPHome configuration.""" # Write the YAML config config_path = await write_yaml_config(yaml_content, filename) - # Compile the configuration - await compile_esphome(config_path) + # Compile the configuration and get binary path + binary_path = await compile_esphome(config_path) # Close the port socket right before running to release the port if port_socket is not None: port_socket.close() - # Run the ESPHome device - process = await run_esphome_process(config_path) - assert process.returncode is None, "Process died immediately" - - # Wait for the API server to start listening - await wait_for_port_open(LOCALHOST, port, timeout=PORT_WAIT_TIMEOUT) - - try: - yield process - finally: - # Process cleanup is handled by run_esphome_process fixture - pass + # Run the binary and wait for the API server to start + async with run_binary_and_wait_for_port(binary_path, LOCALHOST, port): + yield @pytest_asyncio.fixture async def run_compiled( write_yaml_config: ConfigWriter, compile_esphome: CompileFunction, - run_esphome_process: RunFunction, reserved_tcp_port: tuple[int, socket.socket], ) -> AsyncGenerator[RunCompiledFunction]: """Write, compile and run an ESPHome configuration.""" @@ -418,7 +524,6 @@ async def run_compiled( filename, write_yaml_config, compile_esphome, - run_esphome_process, port, port_socket, ) diff --git a/tests/integration/types.py b/tests/integration/types.py index ef1af2add8..6fc3e9435e 100644 --- a/tests/integration/types.py +++ b/tests/integration/types.py @@ -11,11 +11,9 @@ from typing import Protocol from aioesphomeapi import APIClient ConfigWriter = Callable[[str, str | None], Awaitable[Path]] -CompileFunction = Callable[[Path], Awaitable[None]] +CompileFunction = Callable[[Path], Awaitable[Path]] RunFunction = Callable[[Path], Awaitable[asyncio.subprocess.Process]] -RunCompiledFunction = Callable[ - [str, str | None], AbstractAsyncContextManager[asyncio.subprocess.Process] -] +RunCompiledFunction = Callable[[str, str | None], AbstractAsyncContextManager[None]] WaitFunction = Callable[[APIClient, float], Awaitable[bool]]