mirror of
https://github.com/esphome/esphome.git
synced 2025-08-06 02:17:45 +00:00
Support multiple --device arguments for address fallback (#10003)
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
This commit is contained in:
parent
969034b61a
commit
bc03538e25
@ -9,6 +9,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
import argcomplete
|
import argcomplete
|
||||||
|
|
||||||
@ -44,6 +45,7 @@ from esphome.const import (
|
|||||||
from esphome.core import CORE, EsphomeError, coroutine
|
from esphome.core import CORE, EsphomeError, coroutine
|
||||||
from esphome.helpers import get_bool_env, indent, is_ip_address
|
from esphome.helpers import get_bool_env, indent, is_ip_address
|
||||||
from esphome.log import AnsiFore, color, setup_log
|
from esphome.log import AnsiFore, color, setup_log
|
||||||
|
from esphome.types import ConfigType
|
||||||
from esphome.util import (
|
from esphome.util import (
|
||||||
get_serial_ports,
|
get_serial_ports,
|
||||||
list_yaml_files,
|
list_yaml_files,
|
||||||
@ -55,6 +57,23 @@ from esphome.util import (
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ArgsProtocol(Protocol):
|
||||||
|
device: list[str] | None
|
||||||
|
reset: bool
|
||||||
|
username: str | None
|
||||||
|
password: str | None
|
||||||
|
client_id: str | None
|
||||||
|
topic: str | None
|
||||||
|
file: str | None
|
||||||
|
no_logs: bool
|
||||||
|
only_generate: bool
|
||||||
|
show_secrets: bool
|
||||||
|
dashboard: bool
|
||||||
|
configuration: str
|
||||||
|
name: str
|
||||||
|
upload_speed: str | None
|
||||||
|
|
||||||
|
|
||||||
def choose_prompt(options, purpose: str = None):
|
def choose_prompt(options, purpose: str = None):
|
||||||
if not options:
|
if not options:
|
||||||
raise EsphomeError(
|
raise EsphomeError(
|
||||||
@ -88,30 +107,54 @@ def choose_prompt(options, purpose: str = None):
|
|||||||
|
|
||||||
|
|
||||||
def choose_upload_log_host(
|
def choose_upload_log_host(
|
||||||
default, check_default, show_ota, show_mqtt, show_api, purpose: str = None
|
default: list[str] | str | None,
|
||||||
):
|
check_default: str | None,
|
||||||
|
show_ota: bool,
|
||||||
|
show_mqtt: bool,
|
||||||
|
show_api: bool,
|
||||||
|
purpose: str | None = None,
|
||||||
|
) -> list[str]:
|
||||||
|
# Convert to list for uniform handling
|
||||||
|
defaults = [default] if isinstance(default, str) else default or []
|
||||||
|
|
||||||
|
# If devices specified, resolve them
|
||||||
|
if defaults:
|
||||||
|
resolved: list[str] = []
|
||||||
|
for device in defaults:
|
||||||
|
if device == "SERIAL":
|
||||||
|
serial_ports = get_serial_ports()
|
||||||
|
if not serial_ports:
|
||||||
|
_LOGGER.warning("No serial ports found, skipping SERIAL device")
|
||||||
|
continue
|
||||||
|
options = [
|
||||||
|
(f"{port.path} ({port.description})", port.path)
|
||||||
|
for port in serial_ports
|
||||||
|
]
|
||||||
|
resolved.append(choose_prompt(options, purpose=purpose))
|
||||||
|
elif device == "OTA":
|
||||||
|
if (show_ota and "ota" in CORE.config) or (
|
||||||
|
show_api and "api" in CORE.config
|
||||||
|
):
|
||||||
|
resolved.append(CORE.address)
|
||||||
|
elif show_mqtt and has_mqtt_logging():
|
||||||
|
resolved.append("MQTT")
|
||||||
|
else:
|
||||||
|
resolved.append(device)
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
# No devices specified, show interactive chooser
|
||||||
options = [
|
options = [
|
||||||
(f"{port.path} ({port.description})", port.path) for port in get_serial_ports()
|
(f"{port.path} ({port.description})", port.path) for port in get_serial_ports()
|
||||||
]
|
]
|
||||||
if default == "SERIAL":
|
|
||||||
return choose_prompt(options, purpose=purpose)
|
|
||||||
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
|
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
|
||||||
options.append((f"Over The Air ({CORE.address})", CORE.address))
|
options.append((f"Over The Air ({CORE.address})", CORE.address))
|
||||||
if default == "OTA":
|
if show_mqtt and has_mqtt_logging():
|
||||||
return CORE.address
|
mqtt_config = CORE.config[CONF_MQTT]
|
||||||
if (
|
|
||||||
show_mqtt
|
|
||||||
and (mqtt_config := CORE.config.get(CONF_MQTT))
|
|
||||||
and mqtt_logging_enabled(mqtt_config)
|
|
||||||
):
|
|
||||||
options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT"))
|
options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT"))
|
||||||
if default == "OTA":
|
|
||||||
return "MQTT"
|
|
||||||
if default is not None:
|
|
||||||
return default
|
|
||||||
if check_default is not None and check_default in [opt[1] for opt in options]:
|
if check_default is not None and check_default in [opt[1] for opt in options]:
|
||||||
return check_default
|
return [check_default]
|
||||||
return choose_prompt(options, purpose=purpose)
|
return [choose_prompt(options, purpose=purpose)]
|
||||||
|
|
||||||
|
|
||||||
def mqtt_logging_enabled(mqtt_config):
|
def mqtt_logging_enabled(mqtt_config):
|
||||||
@ -123,7 +166,14 @@ def mqtt_logging_enabled(mqtt_config):
|
|||||||
return log_topic.get(CONF_LEVEL, None) != "NONE"
|
return log_topic.get(CONF_LEVEL, None) != "NONE"
|
||||||
|
|
||||||
|
|
||||||
def get_port_type(port):
|
def has_mqtt_logging() -> bool:
|
||||||
|
"""Check if MQTT logging is available."""
|
||||||
|
return (mqtt_config := CORE.config.get(CONF_MQTT)) and mqtt_logging_enabled(
|
||||||
|
mqtt_config
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_port_type(port: str) -> str:
|
||||||
if port.startswith("/") or port.startswith("COM"):
|
if port.startswith("/") or port.startswith("COM"):
|
||||||
return "SERIAL"
|
return "SERIAL"
|
||||||
if port == "MQTT":
|
if port == "MQTT":
|
||||||
@ -131,7 +181,7 @@ def get_port_type(port):
|
|||||||
return "NETWORK"
|
return "NETWORK"
|
||||||
|
|
||||||
|
|
||||||
def run_miniterm(config, port, args):
|
def run_miniterm(config: ConfigType, port: str, args) -> int:
|
||||||
from aioesphomeapi import LogParser
|
from aioesphomeapi import LogParser
|
||||||
import serial
|
import serial
|
||||||
|
|
||||||
@ -208,7 +258,7 @@ def wrap_to_code(name, comp):
|
|||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
def write_cpp(config):
|
def write_cpp(config: ConfigType) -> int:
|
||||||
if not get_bool_env(ENV_NOGITIGNORE):
|
if not get_bool_env(ENV_NOGITIGNORE):
|
||||||
writer.write_gitignore()
|
writer.write_gitignore()
|
||||||
|
|
||||||
@ -216,7 +266,7 @@ def write_cpp(config):
|
|||||||
return write_cpp_file()
|
return write_cpp_file()
|
||||||
|
|
||||||
|
|
||||||
def generate_cpp_contents(config):
|
def generate_cpp_contents(config: ConfigType) -> None:
|
||||||
_LOGGER.info("Generating C++ source...")
|
_LOGGER.info("Generating C++ source...")
|
||||||
|
|
||||||
for name, component, conf in iter_component_configs(CORE.config):
|
for name, component, conf in iter_component_configs(CORE.config):
|
||||||
@ -227,7 +277,7 @@ def generate_cpp_contents(config):
|
|||||||
CORE.flush_tasks()
|
CORE.flush_tasks()
|
||||||
|
|
||||||
|
|
||||||
def write_cpp_file():
|
def write_cpp_file() -> int:
|
||||||
code_s = indent(CORE.cpp_main_section)
|
code_s = indent(CORE.cpp_main_section)
|
||||||
writer.write_cpp(code_s)
|
writer.write_cpp(code_s)
|
||||||
|
|
||||||
@ -238,7 +288,7 @@ def write_cpp_file():
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def compile_program(args, config):
|
def compile_program(args: ArgsProtocol, config: ConfigType) -> int:
|
||||||
from esphome import platformio_api
|
from esphome import platformio_api
|
||||||
|
|
||||||
_LOGGER.info("Compiling app...")
|
_LOGGER.info("Compiling app...")
|
||||||
@ -249,7 +299,9 @@ def compile_program(args, config):
|
|||||||
return 0 if idedata is not None else 1
|
return 0 if idedata is not None else 1
|
||||||
|
|
||||||
|
|
||||||
def upload_using_esptool(config, port, file, speed):
|
def upload_using_esptool(
|
||||||
|
config: ConfigType, port: str, file: str, speed: int
|
||||||
|
) -> str | int:
|
||||||
from esphome import platformio_api
|
from esphome import platformio_api
|
||||||
|
|
||||||
first_baudrate = speed or config[CONF_ESPHOME][CONF_PLATFORMIO_OPTIONS].get(
|
first_baudrate = speed or config[CONF_ESPHOME][CONF_PLATFORMIO_OPTIONS].get(
|
||||||
@ -314,7 +366,7 @@ def upload_using_esptool(config, port, file, speed):
|
|||||||
return run_esptool(115200)
|
return run_esptool(115200)
|
||||||
|
|
||||||
|
|
||||||
def upload_using_platformio(config, port):
|
def upload_using_platformio(config: ConfigType, port: str):
|
||||||
from esphome import platformio_api
|
from esphome import platformio_api
|
||||||
|
|
||||||
upload_args = ["-t", "upload", "-t", "nobuild"]
|
upload_args = ["-t", "upload", "-t", "nobuild"]
|
||||||
@ -323,7 +375,7 @@ def upload_using_platformio(config, port):
|
|||||||
return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args)
|
return platformio_api.run_platformio_cli_run(config, CORE.verbose, *upload_args)
|
||||||
|
|
||||||
|
|
||||||
def check_permissions(port):
|
def check_permissions(port: str):
|
||||||
if os.name == "posix" and get_port_type(port) == "SERIAL":
|
if os.name == "posix" and get_port_type(port) == "SERIAL":
|
||||||
# Check if we can open selected serial port
|
# Check if we can open selected serial port
|
||||||
if not os.access(port, os.F_OK):
|
if not os.access(port, os.F_OK):
|
||||||
@ -341,7 +393,7 @@ def check_permissions(port):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def upload_program(config, args, host):
|
def upload_program(config: ConfigType, args: ArgsProtocol, host: str) -> int | str:
|
||||||
try:
|
try:
|
||||||
module = importlib.import_module("esphome.components." + CORE.target_platform)
|
module = importlib.import_module("esphome.components." + CORE.target_platform)
|
||||||
if getattr(module, "upload_program")(config, args, host):
|
if getattr(module, "upload_program")(config, args, host):
|
||||||
@ -356,7 +408,7 @@ def upload_program(config, args, host):
|
|||||||
return upload_using_esptool(config, host, file, args.upload_speed)
|
return upload_using_esptool(config, host, file, args.upload_speed)
|
||||||
|
|
||||||
if CORE.target_platform in (PLATFORM_RP2040):
|
if CORE.target_platform in (PLATFORM_RP2040):
|
||||||
return upload_using_platformio(config, args.device)
|
return upload_using_platformio(config, host)
|
||||||
|
|
||||||
if CORE.is_libretiny:
|
if CORE.is_libretiny:
|
||||||
return upload_using_platformio(config, host)
|
return upload_using_platformio(config, host)
|
||||||
@ -379,9 +431,12 @@ def upload_program(config, args, host):
|
|||||||
remote_port = int(ota_conf[CONF_PORT])
|
remote_port = int(ota_conf[CONF_PORT])
|
||||||
password = ota_conf.get(CONF_PASSWORD, "")
|
password = ota_conf.get(CONF_PASSWORD, "")
|
||||||
|
|
||||||
|
# Check if we should use MQTT for address resolution
|
||||||
|
# This happens when no device was specified, or the current host is "MQTT"/"OTA"
|
||||||
|
devices: list[str] = args.device or []
|
||||||
if (
|
if (
|
||||||
CONF_MQTT in config # pylint: disable=too-many-boolean-expressions
|
CONF_MQTT in config # pylint: disable=too-many-boolean-expressions
|
||||||
and (not args.device or args.device in ("MQTT", "OTA"))
|
and (not devices or host in ("MQTT", "OTA"))
|
||||||
and (
|
and (
|
||||||
((config[CONF_MDNS][CONF_DISABLED]) and not is_ip_address(CORE.address))
|
((config[CONF_MDNS][CONF_DISABLED]) and not is_ip_address(CORE.address))
|
||||||
or get_port_type(host) == "MQTT"
|
or get_port_type(host) == "MQTT"
|
||||||
@ -399,23 +454,28 @@ def upload_program(config, args, host):
|
|||||||
return espota2.run_ota(host, remote_port, password, CORE.firmware_bin)
|
return espota2.run_ota(host, remote_port, password, CORE.firmware_bin)
|
||||||
|
|
||||||
|
|
||||||
def show_logs(config, args, port):
|
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
|
||||||
if "logger" not in config:
|
if "logger" not in config:
|
||||||
raise EsphomeError("Logger is not configured!")
|
raise EsphomeError("Logger is not configured!")
|
||||||
|
|
||||||
|
port = devices[0]
|
||||||
|
|
||||||
if get_port_type(port) == "SERIAL":
|
if get_port_type(port) == "SERIAL":
|
||||||
check_permissions(port)
|
check_permissions(port)
|
||||||
return run_miniterm(config, port, args)
|
return run_miniterm(config, port, args)
|
||||||
if get_port_type(port) == "NETWORK" and "api" in config:
|
if get_port_type(port) == "NETWORK" and "api" in config:
|
||||||
|
addresses_to_use = devices
|
||||||
if config[CONF_MDNS][CONF_DISABLED] and CONF_MQTT in config:
|
if config[CONF_MDNS][CONF_DISABLED] and CONF_MQTT in config:
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
port = mqtt.get_esphome_device_ip(
|
mqtt_address = mqtt.get_esphome_device_ip(
|
||||||
config, args.username, args.password, args.client_id
|
config, args.username, args.password, args.client_id
|
||||||
)[0]
|
)[0]
|
||||||
|
addresses_to_use = [mqtt_address]
|
||||||
|
|
||||||
from esphome.components.api.client import run_logs
|
from esphome.components.api.client import run_logs
|
||||||
|
|
||||||
return run_logs(config, port)
|
return run_logs(config, addresses_to_use)
|
||||||
if get_port_type(port) == "MQTT" and "mqtt" in config:
|
if get_port_type(port) == "MQTT" and "mqtt" in config:
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
@ -426,7 +486,7 @@ def show_logs(config, args, port):
|
|||||||
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")
|
raise EsphomeError("No remote or local logging method configured (api/mqtt/logger)")
|
||||||
|
|
||||||
|
|
||||||
def clean_mqtt(config, args):
|
def clean_mqtt(config: ConfigType, args: ArgsProtocol) -> int | None:
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
return mqtt.clear_topic(
|
return mqtt.clear_topic(
|
||||||
@ -434,13 +494,13 @@ def clean_mqtt(config, args):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def command_wizard(args):
|
def command_wizard(args: ArgsProtocol) -> int | None:
|
||||||
from esphome import wizard
|
from esphome import wizard
|
||||||
|
|
||||||
return wizard.wizard(args.configuration)
|
return wizard.wizard(args.configuration)
|
||||||
|
|
||||||
|
|
||||||
def command_config(args, config):
|
def command_config(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
if not CORE.verbose:
|
if not CORE.verbose:
|
||||||
config = strip_default_ids(config)
|
config = strip_default_ids(config)
|
||||||
output = yaml_util.dump(config, args.show_secrets)
|
output = yaml_util.dump(config, args.show_secrets)
|
||||||
@ -455,7 +515,7 @@ def command_config(args, config):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def command_vscode(args):
|
def command_vscode(args: ArgsProtocol) -> int | None:
|
||||||
from esphome import vscode
|
from esphome import vscode
|
||||||
|
|
||||||
logging.disable(logging.INFO)
|
logging.disable(logging.INFO)
|
||||||
@ -463,7 +523,7 @@ def command_vscode(args):
|
|||||||
vscode.read_config(args)
|
vscode.read_config(args)
|
||||||
|
|
||||||
|
|
||||||
def command_compile(args, config):
|
def command_compile(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
exit_code = write_cpp(config)
|
exit_code = write_cpp(config)
|
||||||
if exit_code != 0:
|
if exit_code != 0:
|
||||||
return exit_code
|
return exit_code
|
||||||
@ -477,8 +537,9 @@ def command_compile(args, config):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def command_upload(args, config):
|
def command_upload(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
port = choose_upload_log_host(
|
# Get devices, resolving special identifiers like OTA
|
||||||
|
devices = choose_upload_log_host(
|
||||||
default=args.device,
|
default=args.device,
|
||||||
check_default=None,
|
check_default=None,
|
||||||
show_ota=True,
|
show_ota=True,
|
||||||
@ -486,14 +547,22 @@ def command_upload(args, config):
|
|||||||
show_api=False,
|
show_api=False,
|
||||||
purpose="uploading",
|
purpose="uploading",
|
||||||
)
|
)
|
||||||
exit_code = upload_program(config, args, port)
|
|
||||||
if exit_code != 0:
|
# Try each device until one succeeds
|
||||||
return exit_code
|
exit_code = 1
|
||||||
|
for device in devices:
|
||||||
|
_LOGGER.info("Uploading to %s", device)
|
||||||
|
exit_code = upload_program(config, args, device)
|
||||||
|
if exit_code == 0:
|
||||||
_LOGGER.info("Successfully uploaded program.")
|
_LOGGER.info("Successfully uploaded program.")
|
||||||
return 0
|
return 0
|
||||||
|
if len(devices) > 1:
|
||||||
|
_LOGGER.warning("Failed to upload to %s", device)
|
||||||
|
|
||||||
|
return exit_code
|
||||||
|
|
||||||
|
|
||||||
def command_discover(args, config):
|
def command_discover(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
if "mqtt" in config:
|
if "mqtt" in config:
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
@ -502,8 +571,9 @@ def command_discover(args, config):
|
|||||||
raise EsphomeError("No discover method configured (mqtt)")
|
raise EsphomeError("No discover method configured (mqtt)")
|
||||||
|
|
||||||
|
|
||||||
def command_logs(args, config):
|
def command_logs(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
port = choose_upload_log_host(
|
# Get devices, resolving special identifiers like OTA
|
||||||
|
devices = choose_upload_log_host(
|
||||||
default=args.device,
|
default=args.device,
|
||||||
check_default=None,
|
check_default=None,
|
||||||
show_ota=False,
|
show_ota=False,
|
||||||
@ -511,10 +581,10 @@ def command_logs(args, config):
|
|||||||
show_api=True,
|
show_api=True,
|
||||||
purpose="logging",
|
purpose="logging",
|
||||||
)
|
)
|
||||||
return show_logs(config, args, port)
|
return show_logs(config, args, devices)
|
||||||
|
|
||||||
|
|
||||||
def command_run(args, config):
|
def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
exit_code = write_cpp(config)
|
exit_code = write_cpp(config)
|
||||||
if exit_code != 0:
|
if exit_code != 0:
|
||||||
return exit_code
|
return exit_code
|
||||||
@ -531,7 +601,8 @@ def command_run(args, config):
|
|||||||
program_path = idedata.raw["prog_path"]
|
program_path = idedata.raw["prog_path"]
|
||||||
return run_external_process(program_path)
|
return run_external_process(program_path)
|
||||||
|
|
||||||
port = choose_upload_log_host(
|
# Get devices, resolving special identifiers like OTA
|
||||||
|
devices = choose_upload_log_host(
|
||||||
default=args.device,
|
default=args.device,
|
||||||
check_default=None,
|
check_default=None,
|
||||||
show_ota=True,
|
show_ota=True,
|
||||||
@ -539,39 +610,53 @@ def command_run(args, config):
|
|||||||
show_api=True,
|
show_api=True,
|
||||||
purpose="uploading",
|
purpose="uploading",
|
||||||
)
|
)
|
||||||
exit_code = upload_program(config, args, port)
|
|
||||||
if exit_code != 0:
|
# Try each device for upload until one succeeds
|
||||||
return exit_code
|
successful_device: str | None = None
|
||||||
|
for device in devices:
|
||||||
|
_LOGGER.info("Uploading to %s", device)
|
||||||
|
exit_code = upload_program(config, args, device)
|
||||||
|
if exit_code == 0:
|
||||||
_LOGGER.info("Successfully uploaded program.")
|
_LOGGER.info("Successfully uploaded program.")
|
||||||
|
successful_device = device
|
||||||
|
break
|
||||||
|
if len(devices) > 1:
|
||||||
|
_LOGGER.warning("Failed to upload to %s", device)
|
||||||
|
|
||||||
|
if successful_device is None:
|
||||||
|
return exit_code
|
||||||
|
|
||||||
if args.no_logs:
|
if args.no_logs:
|
||||||
return 0
|
return 0
|
||||||
port = choose_upload_log_host(
|
|
||||||
default=args.device,
|
# For logs, prefer the device we successfully uploaded to
|
||||||
check_default=port,
|
devices = choose_upload_log_host(
|
||||||
|
default=successful_device,
|
||||||
|
check_default=successful_device,
|
||||||
show_ota=False,
|
show_ota=False,
|
||||||
show_mqtt=True,
|
show_mqtt=True,
|
||||||
show_api=True,
|
show_api=True,
|
||||||
purpose="logging",
|
purpose="logging",
|
||||||
)
|
)
|
||||||
return show_logs(config, args, port)
|
return show_logs(config, args, devices)
|
||||||
|
|
||||||
|
|
||||||
def command_clean_mqtt(args, config):
|
def command_clean_mqtt(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
return clean_mqtt(config, args)
|
return clean_mqtt(config, args)
|
||||||
|
|
||||||
|
|
||||||
def command_mqtt_fingerprint(args, config):
|
def command_mqtt_fingerprint(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
return mqtt.get_fingerprint(config)
|
return mqtt.get_fingerprint(config)
|
||||||
|
|
||||||
|
|
||||||
def command_version(args):
|
def command_version(args: ArgsProtocol) -> int | None:
|
||||||
safe_print(f"Version: {const.__version__}")
|
safe_print(f"Version: {const.__version__}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def command_clean(args, config):
|
def command_clean(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
try:
|
try:
|
||||||
writer.clean_build()
|
writer.clean_build()
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
@ -581,13 +666,13 @@ def command_clean(args, config):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def command_dashboard(args):
|
def command_dashboard(args: ArgsProtocol) -> int | None:
|
||||||
from esphome.dashboard import dashboard
|
from esphome.dashboard import dashboard
|
||||||
|
|
||||||
return dashboard.start_dashboard(args)
|
return dashboard.start_dashboard(args)
|
||||||
|
|
||||||
|
|
||||||
def command_update_all(args):
|
def command_update_all(args: ArgsProtocol) -> int | None:
|
||||||
import click
|
import click
|
||||||
|
|
||||||
success = {}
|
success = {}
|
||||||
@ -634,7 +719,7 @@ def command_update_all(args):
|
|||||||
return failed
|
return failed
|
||||||
|
|
||||||
|
|
||||||
def command_idedata(args, config):
|
def command_idedata(args: ArgsProtocol, config: ConfigType) -> int:
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from esphome import platformio_api
|
from esphome import platformio_api
|
||||||
@ -650,7 +735,7 @@ def command_idedata(args, config):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def command_rename(args, config):
|
def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||||
for c in args.name:
|
for c in args.name:
|
||||||
if c not in ALLOWED_NAME_CHARS:
|
if c not in ALLOWED_NAME_CHARS:
|
||||||
print(
|
print(
|
||||||
@ -860,7 +945,8 @@ def parse_args(argv):
|
|||||||
)
|
)
|
||||||
parser_upload.add_argument(
|
parser_upload.add_argument(
|
||||||
"--device",
|
"--device",
|
||||||
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0.",
|
action="append",
|
||||||
|
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0. Can be specified multiple times for fallback addresses.",
|
||||||
)
|
)
|
||||||
parser_upload.add_argument(
|
parser_upload.add_argument(
|
||||||
"--upload_speed",
|
"--upload_speed",
|
||||||
@ -882,7 +968,8 @@ def parse_args(argv):
|
|||||||
)
|
)
|
||||||
parser_logs.add_argument(
|
parser_logs.add_argument(
|
||||||
"--device",
|
"--device",
|
||||||
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0.",
|
action="append",
|
||||||
|
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0. Can be specified multiple times for fallback addresses.",
|
||||||
)
|
)
|
||||||
parser_logs.add_argument(
|
parser_logs.add_argument(
|
||||||
"--reset",
|
"--reset",
|
||||||
@ -911,7 +998,8 @@ def parse_args(argv):
|
|||||||
)
|
)
|
||||||
parser_run.add_argument(
|
parser_run.add_argument(
|
||||||
"--device",
|
"--device",
|
||||||
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0.",
|
action="append",
|
||||||
|
help="Manually specify the serial port/address to use, for example /dev/ttyUSB0. Can be specified multiple times for fallback addresses.",
|
||||||
)
|
)
|
||||||
parser_run.add_argument(
|
parser_run.add_argument(
|
||||||
"--upload_speed",
|
"--upload_speed",
|
||||||
|
@ -30,7 +30,7 @@ if TYPE_CHECKING:
|
|||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def async_run_logs(config: dict[str, Any], address: str) -> None:
|
async def async_run_logs(config: dict[str, Any], addresses: list[str]) -> None:
|
||||||
"""Run the logs command in the event loop."""
|
"""Run the logs command in the event loop."""
|
||||||
conf = config["api"]
|
conf = config["api"]
|
||||||
name = config["esphome"]["name"]
|
name = config["esphome"]["name"]
|
||||||
@ -39,13 +39,21 @@ async def async_run_logs(config: dict[str, Any], address: str) -> None:
|
|||||||
noise_psk: str | None = None
|
noise_psk: str | None = None
|
||||||
if (encryption := conf.get(CONF_ENCRYPTION)) and (key := encryption.get(CONF_KEY)):
|
if (encryption := conf.get(CONF_ENCRYPTION)) and (key := encryption.get(CONF_KEY)):
|
||||||
noise_psk = key
|
noise_psk = key
|
||||||
_LOGGER.info("Starting log output from %s using esphome API", address)
|
|
||||||
|
if len(addresses) == 1:
|
||||||
|
_LOGGER.info("Starting log output from %s using esphome API", addresses[0])
|
||||||
|
else:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Starting log output from %s using esphome API", " or ".join(addresses)
|
||||||
|
)
|
||||||
|
|
||||||
cli = APIClient(
|
cli = APIClient(
|
||||||
address,
|
addresses[0], # Primary address for compatibility
|
||||||
port,
|
port,
|
||||||
password,
|
password,
|
||||||
client_info=f"ESPHome Logs {__version__}",
|
client_info=f"ESPHome Logs {__version__}",
|
||||||
noise_psk=noise_psk,
|
noise_psk=noise_psk,
|
||||||
|
addresses=addresses, # Pass all addresses for automatic retry
|
||||||
)
|
)
|
||||||
dashboard = CORE.dashboard
|
dashboard = CORE.dashboard
|
||||||
|
|
||||||
@ -66,7 +74,7 @@ async def async_run_logs(config: dict[str, Any], address: str) -> None:
|
|||||||
await stop()
|
await stop()
|
||||||
|
|
||||||
|
|
||||||
def run_logs(config: dict[str, Any], address: str) -> None:
|
def run_logs(config: dict[str, Any], addresses: list[str]) -> None:
|
||||||
"""Run the logs command."""
|
"""Run the logs command."""
|
||||||
with contextlib.suppress(KeyboardInterrupt):
|
with contextlib.suppress(KeyboardInterrupt):
|
||||||
asyncio.run(async_run_logs(config, address))
|
asyncio.run(async_run_logs(config, addresses))
|
||||||
|
@ -324,38 +324,46 @@ class EsphomePortCommandWebSocket(EsphomeCommandWebSocket):
|
|||||||
configuration = json_message["configuration"]
|
configuration = json_message["configuration"]
|
||||||
config_file = settings.rel_path(configuration)
|
config_file = settings.rel_path(configuration)
|
||||||
port = json_message["port"]
|
port = json_message["port"]
|
||||||
|
addresses: list[str] = [port]
|
||||||
if (
|
if (
|
||||||
port == "OTA" # pylint: disable=too-many-boolean-expressions
|
port == "OTA" # pylint: disable=too-many-boolean-expressions
|
||||||
and (entry := entries.get(config_file))
|
and (entry := entries.get(config_file))
|
||||||
and entry.loaded_integrations
|
and entry.loaded_integrations
|
||||||
and "api" in entry.loaded_integrations
|
and "api" in entry.loaded_integrations
|
||||||
):
|
):
|
||||||
if (mdns := dashboard.mdns_status) and (
|
addresses = []
|
||||||
address_list := await mdns.async_resolve_host(entry.name)
|
# First priority: entry.address AKA use_address
|
||||||
):
|
if (
|
||||||
# Use the IP address if available but only
|
(use_address := entry.address)
|
||||||
# if the API is loaded and the device is online
|
|
||||||
# since MQTT logging will not work otherwise
|
|
||||||
port = sort_ip_addresses(address_list)[0]
|
|
||||||
elif (
|
|
||||||
entry.address
|
|
||||||
and (
|
and (
|
||||||
address_list := await dashboard.dns_cache.async_resolve(
|
address_list := await dashboard.dns_cache.async_resolve(
|
||||||
entry.address, time.monotonic()
|
use_address, time.monotonic()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
and not isinstance(address_list, Exception)
|
and not isinstance(address_list, Exception)
|
||||||
):
|
):
|
||||||
# If mdns is not available, try to use the DNS cache
|
addresses.extend(sort_ip_addresses(address_list))
|
||||||
port = sort_ip_addresses(address_list)[0]
|
|
||||||
|
|
||||||
return [
|
# Second priority: mDNS
|
||||||
*DASHBOARD_COMMAND,
|
if (
|
||||||
*args,
|
(mdns := dashboard.mdns_status)
|
||||||
config_file,
|
and (address_list := await mdns.async_resolve_host(entry.name))
|
||||||
"--device",
|
and (
|
||||||
port,
|
new_addresses := [
|
||||||
|
addr for addr in address_list if addr not in addresses
|
||||||
]
|
]
|
||||||
|
)
|
||||||
|
):
|
||||||
|
# Use the IP address if available but only
|
||||||
|
# if the API is loaded and the device is online
|
||||||
|
# since MQTT logging will not work otherwise
|
||||||
|
addresses.extend(sort_ip_addresses(new_addresses))
|
||||||
|
|
||||||
|
device_args: list[str] = [
|
||||||
|
arg for address in addresses for arg in ("--device", address)
|
||||||
|
]
|
||||||
|
|
||||||
|
return [*DASHBOARD_COMMAND, *args, config_file, *device_args]
|
||||||
|
|
||||||
|
|
||||||
class EsphomeLogsHandler(EsphomePortCommandWebSocket):
|
class EsphomeLogsHandler(EsphomePortCommandWebSocket):
|
||||||
|
@ -6,6 +6,7 @@ from pathlib import Path
|
|||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from esphome import const
|
from esphome import const
|
||||||
|
|
||||||
@ -110,7 +111,7 @@ class RedirectText:
|
|||||||
def __getattr__(self, item):
|
def __getattr__(self, item):
|
||||||
return getattr(self._out, item)
|
return getattr(self._out, item)
|
||||||
|
|
||||||
def _write_color_replace(self, s):
|
def _write_color_replace(self, s: str | bytes) -> None:
|
||||||
from esphome.core import CORE
|
from esphome.core import CORE
|
||||||
|
|
||||||
if CORE.dashboard:
|
if CORE.dashboard:
|
||||||
@ -121,7 +122,7 @@ class RedirectText:
|
|||||||
s = s.replace("\033", "\\033")
|
s = s.replace("\033", "\\033")
|
||||||
self._out.write(s)
|
self._out.write(s)
|
||||||
|
|
||||||
def write(self, s):
|
def write(self, s: str | bytes) -> int:
|
||||||
# s is usually a str already (self._out is of type TextIOWrapper)
|
# s is usually a str already (self._out is of type TextIOWrapper)
|
||||||
# However, s is sometimes also a bytes object in python3. Let's make sure it's a
|
# However, s is sometimes also a bytes object in python3. Let's make sure it's a
|
||||||
# str
|
# str
|
||||||
@ -223,7 +224,7 @@ def run_external_command(
|
|||||||
return retval
|
return retval
|
||||||
|
|
||||||
|
|
||||||
def run_external_process(*cmd, **kwargs):
|
def run_external_process(*cmd: str, **kwargs: Any) -> int | str:
|
||||||
full_cmd = " ".join(shlex_quote(x) for x in cmd)
|
full_cmd = " ".join(shlex_quote(x) for x in cmd)
|
||||||
_LOGGER.debug("Running: %s", full_cmd)
|
_LOGGER.debug("Running: %s", full_cmd)
|
||||||
filter_lines = kwargs.get("filter_lines")
|
filter_lines = kwargs.get("filter_lines")
|
||||||
@ -266,7 +267,7 @@ class OrderedDict(collections.OrderedDict):
|
|||||||
return dict(self).__repr__()
|
return dict(self).__repr__()
|
||||||
|
|
||||||
|
|
||||||
def list_yaml_files(folders):
|
def list_yaml_files(folders: list[str]) -> list[str]:
|
||||||
files = filter_yaml_files(
|
files = filter_yaml_files(
|
||||||
[os.path.join(folder, p) for folder in folders for p in os.listdir(folder)]
|
[os.path.join(folder, p) for folder in folders for p in os.listdir(folder)]
|
||||||
)
|
)
|
||||||
@ -274,7 +275,7 @@ def list_yaml_files(folders):
|
|||||||
return files
|
return files
|
||||||
|
|
||||||
|
|
||||||
def filter_yaml_files(files):
|
def filter_yaml_files(files: list[str]) -> list[str]:
|
||||||
return [
|
return [
|
||||||
f
|
f
|
||||||
for f in files
|
for f in files
|
||||||
|
Loading…
x
Reference in New Issue
Block a user