fix --device OTA

This commit is contained in:
J. Nick Koston
2025-08-04 15:57:29 -10:00
parent 1d6c491dea
commit 5f9080dac9

View File

@@ -107,30 +107,50 @@ def choose_prompt(options, purpose: str = None):
def choose_upload_log_host(
default, check_default, show_ota, show_mqtt, show_api, purpose: str = None
):
default: list[str] | str | None,
check_default: str | None,
show_ota: bool,
show_mqtt: bool,
show_api: bool,
purpose: str | None = None,
) -> list[str]:
# Convert to list for uniform handling
defaults = [default] if isinstance(default, str) else default or []
# If devices specified, resolve them
if defaults:
resolved: list[str] = []
for device in defaults:
if device == "SERIAL":
options = [
(f"{port.path} ({port.description})", port.path)
for port in get_serial_ports()
]
resolved.append(choose_prompt(options, purpose=purpose))
elif device == "OTA":
if (show_ota and "ota" in CORE.config) or (
show_api and "api" in CORE.config
):
resolved.append(CORE.address)
elif show_mqtt and has_mqtt_logging():
resolved.append("MQTT")
else:
resolved.append(device)
return resolved
# No devices specified, show interactive chooser
options = [
(f"{port.path} ({port.description})", port.path) for port in get_serial_ports()
]
if default == "SERIAL":
return choose_prompt(options, purpose=purpose)
if (show_ota and "ota" in CORE.config) or (show_api and "api" in CORE.config):
options.append((f"Over The Air ({CORE.address})", CORE.address))
if default == "OTA":
return CORE.address
if (
show_mqtt
and (mqtt_config := CORE.config.get(CONF_MQTT))
and mqtt_logging_enabled(mqtt_config)
):
if show_mqtt and has_mqtt_logging():
mqtt_config = CORE.config[CONF_MQTT]
options.append((f"MQTT ({mqtt_config[CONF_BROKER]})", "MQTT"))
if default == "OTA":
return "MQTT"
if default is not None:
return default
if check_default is not None and check_default in [opt[1] for opt in options]:
return check_default
return choose_prompt(options, purpose=purpose)
return [check_default]
return [choose_prompt(options, purpose=purpose)]
def mqtt_logging_enabled(mqtt_config):
@@ -142,6 +162,13 @@ def mqtt_logging_enabled(mqtt_config):
return log_topic.get(CONF_LEVEL, None) != "NONE"
def has_mqtt_logging() -> bool:
"""Check if MQTT logging is available."""
return (mqtt_config := CORE.config.get(CONF_MQTT)) and mqtt_logging_enabled(
mqtt_config
)
def get_port_type(port: str) -> str:
if port.startswith("/") or port.startswith("COM"):
return "SERIAL"
@@ -507,19 +534,18 @@ def command_compile(args: ArgsProtocol, config: ConfigType) -> int | None:
def command_upload(args: ArgsProtocol, config: ConfigType) -> int | None:
# No devices specified, use the interactive chooser
devices: list[str] = args.device or [
choose_upload_log_host(
default=None,
check_default=None,
show_ota=True,
show_mqtt=False,
show_api=False,
purpose="uploading",
)
]
# Get devices, resolving special identifiers like OTA
devices = choose_upload_log_host(
default=args.device,
check_default=None,
show_ota=True,
show_mqtt=False,
show_api=False,
purpose="uploading",
)
# Try each device until one succeeds
exit_code = 1
for device in devices:
_LOGGER.info("Uploading to %s", device)
exit_code = upload_program(config, args, device)
@@ -542,17 +568,15 @@ def command_discover(args: ArgsProtocol, config: ConfigType) -> int | None:
def command_logs(args: ArgsProtocol, config: ConfigType) -> int | None:
# No devices specified, use the interactive chooser
devices = args.device or [
choose_upload_log_host(
default=None,
check_default=None,
show_ota=False,
show_mqtt=True,
show_api=True,
purpose="logging",
)
]
# Get devices, resolving special identifiers like OTA
devices = choose_upload_log_host(
default=args.device,
check_default=None,
show_ota=False,
show_mqtt=True,
show_api=True,
purpose="logging",
)
return show_logs(config, args, devices)
@@ -573,17 +597,15 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
program_path = idedata.raw["prog_path"]
return run_external_process(program_path)
# No devices specified, use the interactive chooser
devices = args.device or [
choose_upload_log_host(
default=None,
check_default=None,
show_ota=True,
show_mqtt=False,
show_api=True,
purpose="uploading",
)
]
# Get devices, resolving special identifiers like OTA
devices = choose_upload_log_host(
default=args.device,
check_default=None,
show_ota=True,
show_mqtt=False,
show_api=True,
purpose="uploading",
)
# Try each device for upload until one succeeds
successful_device: str | None = None
@@ -604,7 +626,7 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
return 0
# For logs, prefer the device we successfully uploaded to
port = choose_upload_log_host(
devices = choose_upload_log_host(
default=successful_device,
check_default=successful_device,
show_ota=False,
@@ -612,7 +634,7 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
show_api=True,
purpose="logging",
)
return show_logs(config, args, [port])
return show_logs(config, args, devices)
def command_clean_mqtt(args: ArgsProtocol, config: ConfigType) -> int | None: