[ruff] Enable SIM rules and fix code simplification violations (#9872)

This commit is contained in:
J. Nick Koston 2025-07-24 20:26:08 -10:00 committed by GitHub
parent cb87f156d0
commit ffebd30033
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
72 changed files with 400 additions and 432 deletions

View File

@ -119,9 +119,7 @@ def mqtt_logging_enabled(mqtt_config):
return False return False
if CONF_TOPIC not in log_topic: if CONF_TOPIC not in log_topic:
return False return False
if log_topic.get(CONF_LEVEL, None) == "NONE": return log_topic.get(CONF_LEVEL, None) != "NONE"
return False
return True
def get_port_type(port): def get_port_type(port):

View File

@ -14,6 +14,8 @@ with warnings.catch_warnings():
from aioesphomeapi import APIClient, parse_log_message from aioesphomeapi import APIClient, parse_log_message
from aioesphomeapi.log_runner import async_run from aioesphomeapi.log_runner import async_run
import contextlib
from esphome.const import CONF_KEY, CONF_PASSWORD, CONF_PORT, __version__ from esphome.const import CONF_KEY, CONF_PASSWORD, CONF_PORT, __version__
from esphome.core import CORE from esphome.core import CORE
@ -66,7 +68,5 @@ async def async_run_logs(config: dict[str, Any], address: str) -> None:
def run_logs(config: dict[str, Any], address: str) -> None: def run_logs(config: dict[str, Any], address: str) -> None:
"""Run the logs command.""" """Run the logs command."""
try: with contextlib.suppress(KeyboardInterrupt):
asyncio.run(async_run_logs(config, address)) asyncio.run(async_run_logs(config, address))
except KeyboardInterrupt:
pass

View File

@ -22,9 +22,8 @@ def validate_id(config):
if CONF_CAN_ID in config: if CONF_CAN_ID in config:
can_id = config[CONF_CAN_ID] can_id = config[CONF_CAN_ID]
id_ext = config[CONF_USE_EXTENDED_ID] id_ext = config[CONF_USE_EXTENDED_ID]
if not id_ext: if not id_ext and can_id > 0x7FF:
if can_id > 0x7FF: raise cv.Invalid("Standard IDs must be 11 Bit (0x000-0x7ff / 0-2047)")
raise cv.Invalid("Standard IDs must be 11 Bit (0x000-0x7ff / 0-2047)")
return config return config

View File

@ -953,14 +953,16 @@ def _write_idf_component_yml():
# Called by writer.py # Called by writer.py
def copy_files(): def copy_files():
if CORE.using_arduino: if (
if "partitions.csv" not in CORE.data[KEY_ESP32][KEY_EXTRA_BUILD_FILES]: CORE.using_arduino
write_file_if_changed( and "partitions.csv" not in CORE.data[KEY_ESP32][KEY_EXTRA_BUILD_FILES]
CORE.relative_build_path("partitions.csv"), ):
get_arduino_partition_csv( write_file_if_changed(
CORE.platformio_options.get("board_upload.flash_size") CORE.relative_build_path("partitions.csv"),
), get_arduino_partition_csv(
) CORE.platformio_options.get("board_upload.flash_size")
),
)
if CORE.using_esp_idf: if CORE.using_esp_idf:
_write_sdkconfig() _write_sdkconfig()
_write_idf_component_yml() _write_idf_component_yml()

View File

@ -140,20 +140,22 @@ VALUE_TYPES = {
def validate_char_on_write(char_config): def validate_char_on_write(char_config):
if CONF_ON_WRITE in char_config: if (
if not char_config[CONF_WRITE] and not char_config[CONF_WRITE_NO_RESPONSE]: CONF_ON_WRITE in char_config
raise cv.Invalid( and not char_config[CONF_WRITE]
f"{CONF_ON_WRITE} requires the {CONF_WRITE} or {CONF_WRITE_NO_RESPONSE} property to be set" and not char_config[CONF_WRITE_NO_RESPONSE]
) ):
raise cv.Invalid(
f"{CONF_ON_WRITE} requires the {CONF_WRITE} or {CONF_WRITE_NO_RESPONSE} property to be set"
)
return char_config return char_config
def validate_descriptor(desc_config): def validate_descriptor(desc_config):
if CONF_ON_WRITE in desc_config: if CONF_ON_WRITE in desc_config and not desc_config[CONF_WRITE]:
if not desc_config[CONF_WRITE]: raise cv.Invalid(
raise cv.Invalid( f"{CONF_ON_WRITE} requires the {CONF_WRITE} property to be set"
f"{CONF_ON_WRITE} requires the {CONF_WRITE} property to be set" )
)
if CONF_MAX_LENGTH not in desc_config: if CONF_MAX_LENGTH not in desc_config:
value = desc_config[CONF_VALUE][CONF_DATA] value = desc_config[CONF_VALUE][CONF_DATA]
if cg.is_template(value): if cg.is_template(value):

View File

@ -294,9 +294,8 @@ async def to_code(config):
) )
) )
if get_esp32_variant() == VARIANT_ESP32: if get_esp32_variant() == VARIANT_ESP32 and CONF_IIR_FILTER in config:
if CONF_IIR_FILTER in config: cg.add(touch.set_iir_filter(config[CONF_IIR_FILTER]))
cg.add(touch.set_iir_filter(config[CONF_IIR_FILTER]))
if get_esp32_variant() == VARIANT_ESP32S2 or get_esp32_variant() == VARIANT_ESP32S3: if get_esp32_variant() == VARIANT_ESP32S2 or get_esp32_variant() == VARIANT_ESP32S3:
if CONF_FILTER_MODE in config: if CONF_FILTER_MODE in config:

View File

@ -245,7 +245,7 @@ async def to_code(config):
if ver <= cv.Version(2, 3, 0): if ver <= cv.Version(2, 3, 0):
# No ld script support # No ld script support
ld_script = None ld_script = None
if ver <= cv.Version(2, 4, 2): elif ver <= cv.Version(2, 4, 2):
# Old ld script path # Old ld script path
ld_script = ld_scripts[0] ld_script = ld_scripts[0]
else: else:

View File

@ -112,7 +112,7 @@ def _is_framework_spi_polling_mode_supported():
return True return True
if cv.Version(5, 3, 0) > framework_version >= cv.Version(5, 2, 1): if cv.Version(5, 3, 0) > framework_version >= cv.Version(5, 2, 1):
return True return True
if cv.Version(5, 2, 0) > framework_version >= cv.Version(5, 1, 4): if cv.Version(5, 2, 0) > framework_version >= cv.Version(5, 1, 4): # noqa: SIM103
return True return True
return False return False
if CORE.using_arduino: if CORE.using_arduino:

View File

@ -55,9 +55,7 @@ CONFIG_SCHEMA = cv.All(
async def to_code(config): async def to_code(config):
var = await fastled_base.new_fastled_light(config) var = await fastled_base.new_fastled_light(config)
rgb_order = cg.RawExpression( rgb_order = cg.RawExpression(config.get(CONF_RGB_ORDER, "RGB"))
config[CONF_RGB_ORDER] if CONF_RGB_ORDER in config else "RGB"
)
data_rate = None data_rate = None
if CONF_DATA_RATE in config: if CONF_DATA_RATE in config:

View File

@ -116,7 +116,7 @@ GRAPH_SCHEMA = cv.Schema(
def _relocate_fields_to_subfolder(config, subfolder, subschema): def _relocate_fields_to_subfolder(config, subfolder, subschema):
fields = [k.schema for k in subschema.schema.keys()] fields = [k.schema for k in subschema.schema]
fields.remove(CONF_ID) fields.remove(CONF_ID)
if subfolder in config: if subfolder in config:
# Ensure no ambiguous fields in base of config # Ensure no ambiguous fields in base of config

View File

@ -70,9 +70,8 @@ def validate_url(value):
def validate_ssl_verification(config): def validate_ssl_verification(config):
error_message = "" error_message = ""
if CORE.is_esp32: if CORE.is_esp32 and not CORE.using_esp_idf and config[CONF_VERIFY_SSL]:
if not CORE.using_esp_idf and config[CONF_VERIFY_SSL]: error_message = "ESPHome supports certificate verification only via ESP-IDF"
error_message = "ESPHome supports certificate verification only via ESP-IDF"
if CORE.is_rp2040 and config[CONF_VERIFY_SSL]: if CORE.is_rp2040 and config[CONF_VERIFY_SSL]:
error_message = "ESPHome does not support certificate verification on RP2040" error_message = "ESPHome does not support certificate verification on RP2040"

View File

@ -66,11 +66,10 @@ PROTOCOL_NAMES = {
def _validate(config): def _validate(config):
for conf, models in SUPPORTED_OPTIONS.items(): for conf, models in SUPPORTED_OPTIONS.items():
if conf in config: if conf in config and config[CONF_MODEL] not in models:
if config[CONF_MODEL] not in models: raise cv.Invalid(
raise cv.Invalid( f"{conf} is only available on {' and '.join(models)}, not {config[CONF_MODEL]}"
f"{conf} is only available on {' and '.join(models)}, not {config[CONF_MODEL]}" )
)
return config return config

View File

@ -243,10 +243,7 @@ def _final_validate(_):
def use_legacy(): def use_legacy():
if CORE.using_esp_idf: return not (CORE.using_esp_idf and not _use_legacy_driver)
if not _use_legacy_driver:
return False
return True
FINAL_VALIDATE_SCHEMA = _final_validate FINAL_VALIDATE_SCHEMA = _final_validate

View File

@ -44,9 +44,8 @@ PDM_VARIANTS = [esp32.const.VARIANT_ESP32, esp32.const.VARIANT_ESP32S3]
def _validate_esp32_variant(config): def _validate_esp32_variant(config):
variant = esp32.get_esp32_variant() variant = esp32.get_esp32_variant()
if config[CONF_ADC_TYPE] == "external": if config[CONF_ADC_TYPE] == "external":
if config[CONF_PDM]: if config[CONF_PDM] and variant not in PDM_VARIANTS:
if variant not in PDM_VARIANTS: raise cv.Invalid(f"{variant} does not support PDM")
raise cv.Invalid(f"{variant} does not support PDM")
return config return config
if config[CONF_ADC_TYPE] == "internal": if config[CONF_ADC_TYPE] == "internal":
if variant not in INTERNAL_ADC_VARIANTS: if variant not in INTERNAL_ADC_VARIANTS:
@ -122,9 +121,8 @@ CONFIG_SCHEMA = cv.All(
def _final_validate(config): def _final_validate(config):
if not use_legacy(): if not use_legacy() and config[CONF_ADC_TYPE] == "internal":
if config[CONF_ADC_TYPE] == "internal": raise cv.Invalid("Internal ADC is only compatible with legacy i2s driver.")
raise cv.Invalid("Internal ADC is only compatible with legacy i2s driver.")
FINAL_VALIDATE_SCHEMA = _final_validate FINAL_VALIDATE_SCHEMA = _final_validate

View File

@ -138,9 +138,10 @@ def _validate(config):
]: ]:
raise cv.Invalid("Selected model can't run on ESP8266.") raise cv.Invalid("Selected model can't run on ESP8266.")
if model == "CUSTOM": if model == "CUSTOM" and (
if CONF_INIT_SEQUENCE not in config or CONF_DIMENSIONS not in config: CONF_INIT_SEQUENCE not in config or CONF_DIMENSIONS not in config
raise cv.Invalid("CUSTOM model requires init_sequence and dimensions") ):
raise cv.Invalid("CUSTOM model requires init_sequence and dimensions")
return config return config

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import contextlib
import hashlib import hashlib
import io import io
import logging import logging
@ -174,9 +175,8 @@ class ImageGrayscale(ImageEncoder):
b = 1 b = 1
if self.invert_alpha: if self.invert_alpha:
b ^= 0xFF b ^= 0xFF
if self.transparency == CONF_ALPHA_CHANNEL: if self.transparency == CONF_ALPHA_CHANNEL and a != 0xFF:
if a != 0xFF: b = a
b = a
self.data[self.index] = b self.data[self.index] = b
self.index += 1 self.index += 1
@ -672,10 +672,8 @@ async def write_image(config, all_frames=False):
invert_alpha = config[CONF_INVERT_ALPHA] invert_alpha = config[CONF_INVERT_ALPHA]
frame_count = 1 frame_count = 1
if all_frames: if all_frames:
try: with contextlib.suppress(AttributeError):
frame_count = image.n_frames frame_count = image.n_frames
except AttributeError:
pass
if frame_count <= 1: if frame_count <= 1:
_LOGGER.warning("Image file %s has no animation frames", path) _LOGGER.warning("Image file %s has no animation frames", path)

View File

@ -27,14 +27,13 @@ def validate_logger(config):
logger_conf = fv.full_config.get()[CONF_LOGGER] logger_conf = fv.full_config.get()[CONF_LOGGER]
if logger_conf[CONF_BAUD_RATE] == 0: if logger_conf[CONF_BAUD_RATE] == 0:
raise cv.Invalid("improv_serial requires the logger baud_rate to be not 0") raise cv.Invalid("improv_serial requires the logger baud_rate to be not 0")
if CORE.using_esp_idf: if CORE.using_esp_idf and (
if ( logger_conf[CONF_HARDWARE_UART] == USB_CDC
logger_conf[CONF_HARDWARE_UART] == USB_CDC and get_esp32_variant() == VARIANT_ESP32S3
and get_esp32_variant() == VARIANT_ESP32S3 ):
): raise cv.Invalid(
raise cv.Invalid( "improv_serial does not support the selected logger hardware_uart"
"improv_serial does not support the selected logger hardware_uart" )
)
return config return config

View File

@ -78,11 +78,8 @@ def validate_model_config(config):
model = config[CONF_MODEL] model = config[CONF_MODEL]
for key in config: for key in config:
if key in SENSOR_MODEL_OPTIONS: if key in SENSOR_MODEL_OPTIONS and model not in SENSOR_MODEL_OPTIONS[key]:
if model not in SENSOR_MODEL_OPTIONS[key]: raise cv.Invalid(f"Device model '{model}' does not support '{key}' sensor")
raise cv.Invalid(
f"Device model '{model}' does not support '{key}' sensor"
)
tempco = config[CONF_TEMPERATURE_COEFFICIENT] tempco = config[CONF_TEMPERATURE_COEFFICIENT]
if tempco > 0 and model not in ["INA228", "INA229"]: if tempco > 0 and model not in ["INA228", "INA229"]:

View File

@ -56,7 +56,8 @@ async def to_code(config):
sens = await text_sensor.new_text_sensor(mac_address_config) sens = await text_sensor.new_text_sensor(mac_address_config)
cg.add(ld2450_component.set_mac_text_sensor(sens)) cg.add(ld2450_component.set_mac_text_sensor(sens))
for n in range(MAX_TARGETS): for n in range(MAX_TARGETS):
if direction_conf := config.get(f"target_{n + 1}"): if (direction_conf := config.get(f"target_{n + 1}")) and (
if direction_config := direction_conf.get(CONF_DIRECTION): direction_config := direction_conf.get(CONF_DIRECTION)
sens = await text_sensor.new_text_sensor(direction_config) ):
cg.add(ld2450_component.set_direction_text_sensor(n, sens)) sens = await text_sensor.new_text_sensor(direction_config)
cg.add(ld2450_component.set_direction_text_sensor(n, sens))

View File

@ -526,7 +526,7 @@ def validate_effects(allowed_effects):
errors = [] errors = []
names = set() names = set()
for i, x in enumerate(value): for i, x in enumerate(value):
key = next(it for it in x.keys()) key = next(it for it in x)
if key not in allowed_effects: if key not in allowed_effects:
errors.append( errors.append(
cv.Invalid( cv.Invalid(

View File

@ -346,14 +346,13 @@ async def to_code(config):
if config.get(CONF_ESP8266_STORE_LOG_STRINGS_IN_FLASH): if config.get(CONF_ESP8266_STORE_LOG_STRINGS_IN_FLASH):
cg.add_build_flag("-DUSE_STORE_LOG_STR_IN_FLASH") cg.add_build_flag("-DUSE_STORE_LOG_STR_IN_FLASH")
if CORE.using_arduino: if CORE.using_arduino and config[CONF_HARDWARE_UART] == USB_CDC:
if config[CONF_HARDWARE_UART] == USB_CDC: cg.add_build_flag("-DARDUINO_USB_CDC_ON_BOOT=1")
cg.add_build_flag("-DARDUINO_USB_CDC_ON_BOOT=1") if CORE.is_esp32 and get_esp32_variant() in (
if CORE.is_esp32 and get_esp32_variant() in ( VARIANT_ESP32C3,
VARIANT_ESP32C3, VARIANT_ESP32C6,
VARIANT_ESP32C6, ):
): cg.add_build_flag("-DARDUINO_USB_MODE=1")
cg.add_build_flag("-DARDUINO_USB_MODE=1")
if CORE.using_esp_idf: if CORE.using_esp_idf:
if config[CONF_HARDWARE_UART] == USB_CDC: if config[CONF_HARDWARE_UART] == USB_CDC:

View File

@ -201,9 +201,8 @@ def final_validation(configs):
multi_conf_validate(configs) multi_conf_validate(configs)
global_config = full_config.get() global_config = full_config.get()
for config in configs: for config in configs:
if pages := config.get(CONF_PAGES): if (pages := config.get(CONF_PAGES)) and all(p[df.CONF_SKIP] for p in pages):
if all(p[df.CONF_SKIP] for p in pages): raise cv.Invalid("At least one page must not be skipped")
raise cv.Invalid("At least one page must not be skipped")
for display_id in config[df.CONF_DISPLAYS]: for display_id in config[df.CONF_DISPLAYS]:
path = global_config.get_path_for_id(display_id)[:-1] path = global_config.get_path_for_id(display_id)[:-1]
display = global_config.get_config_for_path(path) display = global_config.get_config_for_path(path)

View File

@ -28,9 +28,10 @@ CONF_HAS_PULLDOWNS = "has_pulldowns"
def check_keys(obj): def check_keys(obj):
if CONF_KEYS in obj: if CONF_KEYS in obj and len(obj[CONF_KEYS]) != len(obj[CONF_ROWS]) * len(
if len(obj[CONF_KEYS]) != len(obj[CONF_ROWS]) * len(obj[CONF_COLUMNS]): obj[CONF_COLUMNS]
raise cv.Invalid("The number of key codes must equal the number of buttons") ):
raise cv.Invalid("The number of key codes must equal the number of buttons")
return obj return obj

View File

@ -124,11 +124,10 @@ async def to_code(config):
if task_stack_in_psram := config.get(CONF_TASK_STACK_IN_PSRAM): if task_stack_in_psram := config.get(CONF_TASK_STACK_IN_PSRAM):
cg.add(var.set_task_stack_in_psram(task_stack_in_psram)) cg.add(var.set_task_stack_in_psram(task_stack_in_psram))
if task_stack_in_psram: if task_stack_in_psram and config[CONF_TASK_STACK_IN_PSRAM]:
if config[CONF_TASK_STACK_IN_PSRAM]: esp32.add_idf_sdkconfig_option(
esp32.add_idf_sdkconfig_option( "CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True )
)
for speaker_config in config[CONF_SOURCE_SPEAKERS]: for speaker_config in config[CONF_SOURCE_SPEAKERS]:
source_speaker = cg.new_Pvariable(speaker_config[CONF_ID]) source_speaker = cg.new_Pvariable(speaker_config[CONF_ID])

View File

@ -63,11 +63,13 @@ def _validate(config):
raise cv.Invalid( raise cv.Invalid(
f"{axis}: {CONF_RESOLUTION} cannot be {res} with {CONF_TEMPERATURE_COMPENSATION} enabled" f"{axis}: {CONF_RESOLUTION} cannot be {res} with {CONF_TEMPERATURE_COMPENSATION} enabled"
) )
if config[CONF_HALLCONF] == 0xC: if config[CONF_HALLCONF] == 0xC and (
if (config[CONF_OVERSAMPLING], config[CONF_FILTER]) in [(0, 0), (1, 0), (0, 1)]: config[CONF_OVERSAMPLING],
raise cv.Invalid( config[CONF_FILTER],
f"{CONF_OVERSAMPLING}=={config[CONF_OVERSAMPLING]} and {CONF_FILTER}=={config[CONF_FILTER]} not allowed with {CONF_HALLCONF}=={config[CONF_HALLCONF]:#02x}" ) in [(0, 0), (1, 0), (0, 1)]:
) raise cv.Invalid(
f"{CONF_OVERSAMPLING}=={config[CONF_OVERSAMPLING]} and {CONF_FILTER}=={config[CONF_FILTER]} not allowed with {CONF_HALLCONF}=={config[CONF_HALLCONF]:#02x}"
)
return config return config

View File

@ -56,12 +56,13 @@ def _final_validate(config):
for binary_sensor in binary_sensors: for binary_sensor in binary_sensors:
if binary_sensor.get(CONF_MPR121_ID) == config[CONF_ID]: if binary_sensor.get(CONF_MPR121_ID) == config[CONF_ID]:
max_touch_channel = max(max_touch_channel, binary_sensor[CONF_CHANNEL]) max_touch_channel = max(max_touch_channel, binary_sensor[CONF_CHANNEL])
if max_touch_channel_in_config := config.get(CONF_MAX_TOUCH_CHANNEL): if (
if max_touch_channel != max_touch_channel_in_config: max_touch_channel_in_config := config.get(CONF_MAX_TOUCH_CHANNEL)
raise cv.Invalid( ) and max_touch_channel != max_touch_channel_in_config:
"Max touch channel must equal the highest binary sensor channel or be removed for auto calculation", raise cv.Invalid(
path=[CONF_MAX_TOUCH_CHANNEL], "Max touch channel must equal the highest binary sensor channel or be removed for auto calculation",
) path=[CONF_MAX_TOUCH_CHANNEL],
)
path = fconf.get_path_for_id(config[CONF_ID])[:-1] path = fconf.get_path_for_id(config[CONF_ID])[:-1]
this_config = fconf.get_config_for_path(path) this_config = fconf.get_config_for_path(path)
this_config[CONF_MAX_TOUCH_CHANNEL] = max_touch_channel this_config[CONF_MAX_TOUCH_CHANNEL] = max_touch_channel

View File

@ -25,9 +25,9 @@ async def new_openthermnumber(config: dict[str, Any]) -> cg.Pvariable:
await cg.register_component(var, config) await cg.register_component(var, config)
input.generate_setters(var, config) input.generate_setters(var, config)
if (initial_value := config.get(CONF_INITIAL_VALUE, None)) is not None: if (initial_value := config.get(CONF_INITIAL_VALUE)) is not None:
cg.add(var.set_initial_value(initial_value)) cg.add(var.set_initial_value(initial_value))
if (restore_value := config.get(CONF_RESTORE_VALUE, None)) is not None: if (restore_value := config.get(CONF_RESTORE_VALUE)) is not None:
cg.add(var.set_restore_value(restore_value)) cg.add(var.set_restore_value(restore_value))
return var return var

View File

@ -79,9 +79,8 @@ def set_sdkconfig_options(config):
"CONFIG_OPENTHREAD_NETWORK_PSKC", f"{pskc:X}".lower() "CONFIG_OPENTHREAD_NETWORK_PSKC", f"{pskc:X}".lower()
) )
if force_dataset := config.get(CONF_FORCE_DATASET): if config.get(CONF_FORCE_DATASET):
if force_dataset: cg.add_define("USE_OPENTHREAD_FORCE_DATASET")
cg.add_define("USE_OPENTHREAD_FORCE_DATASET")
add_idf_sdkconfig_option("CONFIG_OPENTHREAD_DNS64_CLIENT", True) add_idf_sdkconfig_option("CONFIG_OPENTHREAD_DNS64_CLIENT", True)
add_idf_sdkconfig_option("CONFIG_OPENTHREAD_SRP_CLIENT", True) add_idf_sdkconfig_option("CONFIG_OPENTHREAD_SRP_CLIENT", True)

View File

@ -89,9 +89,10 @@ def validate_(config):
raise cv.Invalid("No sensors or binary sensors to encrypt") raise cv.Invalid("No sensors or binary sensors to encrypt")
elif config[CONF_ROLLING_CODE_ENABLE]: elif config[CONF_ROLLING_CODE_ENABLE]:
raise cv.Invalid("Rolling code requires an encryption key") raise cv.Invalid("Rolling code requires an encryption key")
if config[CONF_PING_PONG_ENABLE]: if config[CONF_PING_PONG_ENABLE] and not any(
if not any(CONF_ENCRYPTION in p for p in config.get(CONF_PROVIDERS) or ()): CONF_ENCRYPTION in p for p in config.get(CONF_PROVIDERS) or ()
raise cv.Invalid("Ping-pong requires at least one encrypted provider") ):
raise cv.Invalid("Ping-pong requires at least one encrypted provider")
return config return config

View File

@ -49,12 +49,15 @@ def validate_internal_filter(value):
[CONF_USE_PCNT], [CONF_USE_PCNT],
) )
if CORE.is_esp32 and use_pcnt: if (
if value.get(CONF_INTERNAL_FILTER).total_microseconds > 13: CORE.is_esp32
raise cv.Invalid( and use_pcnt
"Maximum internal filter value when using ESP32 hardware PCNT is 13us", and value.get(CONF_INTERNAL_FILTER).total_microseconds > 13
[CONF_INTERNAL_FILTER], ):
) raise cv.Invalid(
"Maximum internal filter value when using ESP32 hardware PCNT is 13us",
[CONF_INTERNAL_FILTER],
)
return value return value

View File

@ -73,9 +73,8 @@ def map_sequence(value):
def _validate(config): def _validate(config):
chip = DriverChip.chips[config[CONF_MODEL]] chip = DriverChip.chips[config[CONF_MODEL]]
if not chip.initsequence: if not chip.initsequence and CONF_INIT_SEQUENCE not in config:
if CONF_INIT_SEQUENCE not in config: raise cv.Invalid(f"{chip.name} model requires init_sequence")
raise cv.Invalid(f"{chip.name} model requires init_sequence")
return config return config

View File

@ -24,9 +24,8 @@ QwiicPIRComponent = qwiic_pir_ns.class_(
def validate_no_debounce_unless_native(config): def validate_no_debounce_unless_native(config):
if CONF_DEBOUNCE in config: if CONF_DEBOUNCE in config and config[CONF_DEBOUNCE_MODE] != "NATIVE":
if config[CONF_DEBOUNCE_MODE] != "NATIVE": raise cv.Invalid("debounce can only be set if debounce_mode is NATIVE")
raise cv.Invalid("debounce can only be set if debounce_mode is NATIVE")
return config return config

View File

@ -1062,12 +1062,11 @@ def validate_raw_alternating(value):
last_negative = None last_negative = None
for i, val in enumerate(value): for i, val in enumerate(value):
this_negative = val < 0 this_negative = val < 0
if i != 0: if i != 0 and this_negative == last_negative:
if this_negative == last_negative: raise cv.Invalid(
raise cv.Invalid( f"Values must alternate between being positive and negative, please see index {i} and {i + 1}",
f"Values must alternate between being positive and negative, please see index {i} and {i + 1}", [i],
[i], )
)
last_negative = this_negative last_negative = this_negative
return value return value

View File

@ -90,11 +90,10 @@ async def to_code(config):
if task_stack_in_psram := config.get(CONF_TASK_STACK_IN_PSRAM): if task_stack_in_psram := config.get(CONF_TASK_STACK_IN_PSRAM):
cg.add(var.set_task_stack_in_psram(task_stack_in_psram)) cg.add(var.set_task_stack_in_psram(task_stack_in_psram))
if task_stack_in_psram: if task_stack_in_psram and config[CONF_TASK_STACK_IN_PSRAM]:
if config[CONF_TASK_STACK_IN_PSRAM]: esp32.add_idf_sdkconfig_option(
esp32.add_idf_sdkconfig_option( "CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True
"CONFIG_SPIRAM_ALLOW_STACK_EXTERNAL_MEMORY", True )
)
cg.add(var.set_target_bits_per_sample(config[CONF_BITS_PER_SAMPLE])) cg.add(var.set_target_bits_per_sample(config[CONF_BITS_PER_SAMPLE]))
cg.add(var.set_target_sample_rate(config[CONF_SAMPLE_RATE])) cg.add(var.set_target_sample_rate(config[CONF_SAMPLE_RATE]))

View File

@ -140,7 +140,6 @@ async def to_code(config):
cg.add(var.set_vsync_front_porch(config[CONF_VSYNC_FRONT_PORCH])) cg.add(var.set_vsync_front_porch(config[CONF_VSYNC_FRONT_PORCH]))
cg.add(var.set_pclk_inverted(config[CONF_PCLK_INVERTED])) cg.add(var.set_pclk_inverted(config[CONF_PCLK_INVERTED]))
cg.add(var.set_pclk_frequency(config[CONF_PCLK_FREQUENCY])) cg.add(var.set_pclk_frequency(config[CONF_PCLK_FREQUENCY]))
index = 0
dpins = [] dpins = []
if CONF_RED in config[CONF_DATA_PINS]: if CONF_RED in config[CONF_DATA_PINS]:
red_pins = config[CONF_DATA_PINS][CONF_RED] red_pins = config[CONF_DATA_PINS][CONF_RED]
@ -158,10 +157,9 @@ async def to_code(config):
dpins = dpins[8:16] + dpins[0:8] dpins = dpins[8:16] + dpins[0:8]
else: else:
dpins = config[CONF_DATA_PINS] dpins = config[CONF_DATA_PINS]
for pin in dpins: for index, pin in enumerate(dpins):
data_pin = await cg.gpio_pin_expression(pin) data_pin = await cg.gpio_pin_expression(pin)
cg.add(var.add_data_pin(data_pin, index)) cg.add(var.add_data_pin(data_pin, index))
index += 1
if enable_pin := config.get(CONF_ENABLE_PIN): if enable_pin := config.get(CONF_ENABLE_PIN):
enable = await cg.gpio_pin_expression(enable_pin) enable = await cg.gpio_pin_expression(enable_pin)

View File

@ -204,13 +204,14 @@ def _validate_pipeline(config):
def _validate_repeated_speaker(config): def _validate_repeated_speaker(config):
if (announcement_config := config.get(CONF_ANNOUNCEMENT_PIPELINE)) and ( if (
media_config := config.get(CONF_MEDIA_PIPELINE) (announcement_config := config.get(CONF_ANNOUNCEMENT_PIPELINE))
and (media_config := config.get(CONF_MEDIA_PIPELINE))
and announcement_config[CONF_SPEAKER] == media_config[CONF_SPEAKER]
): ):
if announcement_config[CONF_SPEAKER] == media_config[CONF_SPEAKER]: raise cv.Invalid(
raise cv.Invalid( "The announcement and media pipelines cannot use the same speaker. Use the `mixer` speaker component to create two source speakers."
"The announcement and media pipelines cannot use the same speaker. Use the `mixer` speaker component to create two source speakers." )
)
return config return config

View File

@ -115,9 +115,7 @@ def get_target_platform():
def get_target_variant(): def get_target_variant():
return ( return CORE.data[KEY_ESP32].get(KEY_VARIANT, "")
CORE.data[KEY_ESP32][KEY_VARIANT] if KEY_VARIANT in CORE.data[KEY_ESP32] else ""
)
# Get a list of available hardware interfaces based on target and variant. # Get a list of available hardware interfaces based on target and variant.
@ -213,9 +211,7 @@ def validate_hw_pins(spi, index=-1):
return False return False
if sdo_pin_no not in pin_set[CONF_MOSI_PIN]: if sdo_pin_no not in pin_set[CONF_MOSI_PIN]:
return False return False
if sdi_pin_no not in pin_set[CONF_MISO_PIN]: return sdi_pin_no in pin_set[CONF_MISO_PIN]
return False
return True
return False return False

View File

@ -130,11 +130,11 @@ def validate_sprinkler(config):
if ( if (
CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY in sprinkler_controller CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY in sprinkler_controller
and CONF_VALVE_OPEN_DELAY not in sprinkler_controller and CONF_VALVE_OPEN_DELAY not in sprinkler_controller
and sprinkler_controller[CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY]
): ):
if sprinkler_controller[CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY]: raise cv.Invalid(
raise cv.Invalid( f"{CONF_VALVE_OPEN_DELAY} must be defined when {CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY} is enabled"
f"{CONF_VALVE_OPEN_DELAY} must be defined when {CONF_PUMP_SWITCH_OFF_DURING_VALVE_OPEN_DELAY} is enabled" )
)
if ( if (
CONF_REPEAT in sprinkler_controller CONF_REPEAT in sprinkler_controller

View File

@ -42,14 +42,15 @@ SSD1306_MODEL = cv.enum(MODELS, upper=True, space="_")
def _validate(value): def _validate(value):
model = value[CONF_MODEL] model = value[CONF_MODEL]
if model not in ("SSD1305_128X32", "SSD1305_128X64"): if (
# Contrast is default value (1.0) while brightness is not model not in ("SSD1305_128X32", "SSD1305_128X64")
# Indicates user is using old `brightness` option and value[CONF_BRIGHTNESS] != 1.0
if value[CONF_BRIGHTNESS] != 1.0 and value[CONF_CONTRAST] == 1.0: and value[CONF_CONTRAST] == 1.0
raise cv.Invalid( ):
"SSD1306/SH1106 no longer accepts brightness option, " raise cv.Invalid(
'please use "contrast" instead.' "SSD1306/SH1106 no longer accepts brightness option, "
) 'please use "contrast" instead.'
)
return value return value

View File

@ -189,7 +189,6 @@ async def to_code(config):
cg.add(var.set_vsync_front_porch(config[CONF_VSYNC_FRONT_PORCH])) cg.add(var.set_vsync_front_porch(config[CONF_VSYNC_FRONT_PORCH]))
cg.add(var.set_pclk_inverted(config[CONF_PCLK_INVERTED])) cg.add(var.set_pclk_inverted(config[CONF_PCLK_INVERTED]))
cg.add(var.set_pclk_frequency(config[CONF_PCLK_FREQUENCY])) cg.add(var.set_pclk_frequency(config[CONF_PCLK_FREQUENCY]))
index = 0
dpins = [] dpins = []
if CONF_RED in config[CONF_DATA_PINS]: if CONF_RED in config[CONF_DATA_PINS]:
red_pins = config[CONF_DATA_PINS][CONF_RED] red_pins = config[CONF_DATA_PINS][CONF_RED]
@ -207,10 +206,9 @@ async def to_code(config):
dpins = dpins[8:16] + dpins[0:8] dpins = dpins[8:16] + dpins[0:8]
else: else:
dpins = config[CONF_DATA_PINS] dpins = config[CONF_DATA_PINS]
for pin in dpins: for index, pin in enumerate(dpins):
data_pin = await cg.gpio_pin_expression(pin) data_pin = await cg.gpio_pin_expression(pin)
cg.add(var.add_data_pin(data_pin, index)) cg.add(var.add_data_pin(data_pin, index))
index += 1
if dc_pin := config.get(CONF_DC_PIN): if dc_pin := config.get(CONF_DC_PIN):
dc = await cg.gpio_pin_expression(dc_pin) dc = await cg.gpio_pin_expression(dc_pin)

View File

@ -49,15 +49,14 @@ def _expand_jinja(value, orig_value, path, jinja, ignore_missing):
try: try:
# Invoke the jinja engine to evaluate the expression. # Invoke the jinja engine to evaluate the expression.
value, err = jinja.expand(value) value, err = jinja.expand(value)
if err is not None: if err is not None and not ignore_missing and "password" not in path:
if not ignore_missing and "password" not in path: _LOGGER.warning(
_LOGGER.warning( "Found '%s' (see %s) which looks like an expression,"
"Found '%s' (see %s) which looks like an expression," " but could not resolve all the variables: %s",
" but could not resolve all the variables: %s", value,
value, "->".join(str(x) for x in path),
"->".join(str(x) for x in path), err.message,
err.message, )
)
except ( except (
TemplateError, TemplateError,
TemplateRuntimeError, TemplateRuntimeError,

View File

@ -1,3 +1,4 @@
import contextlib
import re import re
from esphome import automation from esphome import automation
@ -41,12 +42,10 @@ ELEVATION_MAP = {
def elevation(value): def elevation(value):
if isinstance(value, str): if isinstance(value, str):
try: with contextlib.suppress(cv.Invalid):
value = ELEVATION_MAP[ value = ELEVATION_MAP[
cv.one_of(*ELEVATION_MAP, lower=True, space="_")(value) cv.one_of(*ELEVATION_MAP, lower=True, space="_")(value)
] ]
except cv.Invalid:
pass
value = cv.angle(value) value = cv.angle(value)
return cv.float_range(min=-180, max=180)(value) return cv.float_range(min=-180, max=180)(value)

View File

@ -41,11 +41,13 @@ SX1509KeyTrigger = sx1509_ns.class_(
def check_keys(config): def check_keys(config):
if CONF_KEYS in config: if (
if len(config[CONF_KEYS]) != config[CONF_KEY_ROWS] * config[CONF_KEY_COLUMNS]: CONF_KEYS in config
raise cv.Invalid( and len(config[CONF_KEYS]) != config[CONF_KEY_ROWS] * config[CONF_KEY_COLUMNS]
"The number of key codes must equal the number of rows * columns" ):
) raise cv.Invalid(
"The number of key codes must equal the number of rows * columns"
)
return config return config

View File

@ -477,11 +477,11 @@ def validate_thermostat(config):
if ( if (
CONF_ON_BOOT_RESTORE_FROM in config CONF_ON_BOOT_RESTORE_FROM in config
and config[CONF_ON_BOOT_RESTORE_FROM] is OnBootRestoreFrom.DEFAULT_PRESET and config[CONF_ON_BOOT_RESTORE_FROM] is OnBootRestoreFrom.DEFAULT_PRESET
and CONF_DEFAULT_PRESET not in config
): ):
if CONF_DEFAULT_PRESET not in config: raise cv.Invalid(
raise cv.Invalid( f"{CONF_DEFAULT_PRESET} must be defined to use {CONF_ON_BOOT_RESTORE_FROM} in DEFAULT_PRESET mode"
f"{CONF_DEFAULT_PRESET} must be defined to use {CONF_ON_BOOT_RESTORE_FROM} in DEFAULT_PRESET mode" )
)
if config[CONF_FAN_WITH_COOLING] is True and CONF_FAN_ONLY_ACTION not in config: if config[CONF_FAN_WITH_COOLING] is True and CONF_FAN_ONLY_ACTION not in config:
raise cv.Invalid( raise cv.Invalid(

View File

@ -236,7 +236,7 @@ def validate_time_at(value):
def validate_cron_keys(value): def validate_cron_keys(value):
if CONF_CRON in value: if CONF_CRON in value:
for key in value.keys(): for key in value:
if key in CRON_KEYS: if key in CRON_KEYS:
raise cv.Invalid(f"Cannot use option {key} when cron: is specified.") raise cv.Invalid(f"Cannot use option {key} when cron: is specified.")
if CONF_AT in value: if CONF_AT in value:
@ -246,7 +246,7 @@ def validate_cron_keys(value):
value.update(cron_) value.update(cron_)
return value return value
if CONF_AT in value: if CONF_AT in value:
for key in value.keys(): for key in value:
if key in CRON_KEYS: if key in CRON_KEYS:
raise cv.Invalid(f"Cannot use option {key} when at: is specified.") raise cv.Invalid(f"Cannot use option {key} when at: is specified.")
at_ = value[CONF_AT] at_ = value[CONF_AT]

View File

@ -46,16 +46,15 @@ TuyaClimate = tuya_ns.class_("TuyaClimate", climate.Climate, cg.Component)
def validate_temperature_multipliers(value): def validate_temperature_multipliers(value):
if CONF_TEMPERATURE_MULTIPLIER in value: if CONF_TEMPERATURE_MULTIPLIER in value and (
if ( CONF_CURRENT_TEMPERATURE_MULTIPLIER in value
CONF_CURRENT_TEMPERATURE_MULTIPLIER in value or CONF_TARGET_TEMPERATURE_MULTIPLIER in value
or CONF_TARGET_TEMPERATURE_MULTIPLIER in value ):
): raise cv.Invalid(
raise cv.Invalid( f"Cannot have {CONF_TEMPERATURE_MULTIPLIER} at the same time as "
f"Cannot have {CONF_TEMPERATURE_MULTIPLIER} at the same time as " f"{CONF_CURRENT_TEMPERATURE_MULTIPLIER} and "
f"{CONF_CURRENT_TEMPERATURE_MULTIPLIER} and " f"{CONF_TARGET_TEMPERATURE_MULTIPLIER}"
f"{CONF_TARGET_TEMPERATURE_MULTIPLIER}" )
)
if ( if (
CONF_CURRENT_TEMPERATURE_MULTIPLIER in value CONF_CURRENT_TEMPERATURE_MULTIPLIER in value
and CONF_TARGET_TEMPERATURE_MULTIPLIER not in value and CONF_TARGET_TEMPERATURE_MULTIPLIER not in value

View File

@ -34,12 +34,14 @@ def validate_min_max(config):
min_value = config[CONF_MIN_VALUE] min_value = config[CONF_MIN_VALUE]
if max_value <= min_value: if max_value <= min_value:
raise cv.Invalid("max_value must be greater than min_value") raise cv.Invalid("max_value must be greater than min_value")
if hidden_config := config.get(CONF_DATAPOINT_HIDDEN): if (
if (initial_value := hidden_config.get(CONF_INITIAL_VALUE, None)) is not None: (hidden_config := config.get(CONF_DATAPOINT_HIDDEN))
if (initial_value > max_value) or (initial_value < min_value): and (initial_value := hidden_config.get(CONF_INITIAL_VALUE, None)) is not None
raise cv.Invalid( and ((initial_value > max_value) or (initial_value < min_value))
f"{CONF_INITIAL_VALUE} must be a value between {CONF_MAX_VALUE} and {CONF_MIN_VALUE}" ):
) raise cv.Invalid(
f"{CONF_INITIAL_VALUE} must be a value between {CONF_MAX_VALUE} and {CONF_MIN_VALUE}"
)
return config return config

View File

@ -442,9 +442,7 @@ async def to_code(config):
if CORE.is_esp8266: if CORE.is_esp8266:
cg.add_library("ESP8266WiFi", None) cg.add_library("ESP8266WiFi", None)
elif CORE.is_esp32 and CORE.using_arduino: elif (CORE.is_esp32 and CORE.using_arduino) or CORE.is_rp2040:
cg.add_library("WiFi", None)
elif CORE.is_rp2040:
cg.add_library("WiFi", None) cg.add_library("WiFi", None)
if CORE.is_esp32 and CORE.using_esp_idf: if CORE.is_esp32 and CORE.using_esp_idf:

View File

@ -198,10 +198,7 @@ class Config(OrderedDict, fv.FinalValidateConfig):
self.output_paths.remove((path, domain)) self.output_paths.remove((path, domain))
def is_in_error_path(self, path: ConfigPath) -> bool: def is_in_error_path(self, path: ConfigPath) -> bool:
for err in self.errors: return any(_path_begins_with(err.path, path) for err in self.errors)
if _path_begins_with(err.path, path):
return True
return False
def set_by_path(self, path, value): def set_by_path(self, path, value):
conf = self conf = self
@ -224,7 +221,7 @@ class Config(OrderedDict, fv.FinalValidateConfig):
for index, path_item in enumerate(path): for index, path_item in enumerate(path):
try: try:
if path_item in data: if path_item in data:
key_data = [x for x in data.keys() if x == path_item][0] key_data = [x for x in data if x == path_item][0]
if isinstance(key_data, ESPHomeDataBase): if isinstance(key_data, ESPHomeDataBase):
doc_range = key_data.esp_range doc_range = key_data.esp_range
if get_key and index == len(path) - 1: if get_key and index == len(path) - 1:
@ -1081,7 +1078,7 @@ def dump_dict(
ret += "{}" ret += "{}"
multiline = False multiline = False
for k in conf.keys(): for k in conf:
path_ = path + [k] path_ = path + [k]
error = config.get_error_for_path(path_) error = config.get_error_for_path(path_)
if error is not None: if error is not None:
@ -1097,10 +1094,7 @@ def dump_dict(
msg = f"\n{indent(msg)}" msg = f"\n{indent(msg)}"
if inf is not None: if inf is not None:
if m: msg = f" {inf}{msg}" if m else f"{msg} {inf}"
msg = f" {inf}{msg}"
else:
msg = f"{msg} {inf}"
ret += f"{st + msg}\n" ret += f"{st + msg}\n"
elif isinstance(conf, str): elif isinstance(conf, str):
if is_secret(conf): if is_secret(conf):

View File

@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from contextlib import contextmanager from contextlib import contextmanager, suppress
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from ipaddress import ( from ipaddress import (
@ -2113,10 +2113,8 @@ def require_esphome_version(year, month, patch):
@contextmanager @contextmanager
def suppress_invalid(): def suppress_invalid():
try: with suppress(vol.Invalid):
yield yield
except vol.Invalid:
pass
GIT_SCHEMA = Schema( GIT_SCHEMA = Schema(

View File

@ -1037,10 +1037,7 @@ class MockObjClass(MockObj):
def inherits_from(self, other: "MockObjClass") -> bool: def inherits_from(self, other: "MockObjClass") -> bool:
if str(self) == str(other): if str(self) == str(other):
return True return True
for parent in self._parents: return any(str(parent) == str(other) for parent in self._parents)
if str(parent) == str(other):
return True
return False
def template(self, *args: SafeExpType) -> "MockObjClass": def template(self, *args: SafeExpType) -> "MockObjClass":
if len(args) != 1 or not isinstance(args[0], TemplateArguments): if len(args) != 1 or not isinstance(args[0], TemplateArguments):

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio import asyncio
from asyncio import events from asyncio import events
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import contextlib
import logging import logging
import os import os
import socket import socket
@ -125,10 +126,8 @@ def start_dashboard(args) -> None:
asyncio.set_event_loop_policy(DashboardEventLoopPolicy(settings.verbose)) asyncio.set_event_loop_policy(DashboardEventLoopPolicy(settings.verbose))
try: with contextlib.suppress(KeyboardInterrupt):
asyncio.run(async_start(args)) asyncio.run(async_start(args))
except KeyboardInterrupt:
pass
async def async_start(args) -> None: async def async_start(args) -> None:

View File

@ -88,10 +88,7 @@ def recv_decode(sock, amount, decode=True):
def receive_exactly(sock, amount, msg, expect, decode=True): def receive_exactly(sock, amount, msg, expect, decode=True):
if decode: data = [] if decode else b""
data = []
else:
data = b""
try: try:
data += recv_decode(sock, 1, decode=decode) data += recv_decode(sock, 1, decode=decode)

View File

@ -96,9 +96,7 @@ def cpp_string_escape(string, encoding="utf-8"):
def _should_escape(byte: int) -> bool: def _should_escape(byte: int) -> bool:
if not 32 <= byte < 127: if not 32 <= byte < 127:
return True return True
if byte in (ord("\\"), ord('"')): return byte in (ord("\\"), ord('"'))
return True
return False
if isinstance(string, str): if isinstance(string, str):
string = string.encode(encoding) string = string.encode(encoding)

View File

@ -61,7 +61,7 @@ class ESPHomeLogFormatter(logging.Formatter):
}.get(record.levelname, "") }.get(record.levelname, "")
message = f"{prefix}{formatted}{AnsiStyle.RESET_ALL.value}" message = f"{prefix}{formatted}{AnsiStyle.RESET_ALL.value}"
if CORE.dashboard: if CORE.dashboard:
try: try: # noqa: SIM105
message = message.replace("\033", "\\033") message = message.replace("\033", "\\033")
except UnicodeEncodeError: except UnicodeEncodeError:
pass pass

View File

@ -1,3 +1,4 @@
import contextlib
from datetime import datetime from datetime import datetime
import hashlib import hashlib
import json import json
@ -52,10 +53,8 @@ def initialize(
client = prepare( client = prepare(
config, subscriptions, on_message, on_connect, username, password, client_id config, subscriptions, on_message, on_connect, username, password, client_id
) )
try: with contextlib.suppress(KeyboardInterrupt):
client.loop_forever() client.loop_forever()
except KeyboardInterrupt:
pass
return 0 return 0

View File

@ -131,9 +131,11 @@ def _load_idedata(config):
temp_idedata = Path(CORE.relative_internal_path("idedata", f"{CORE.name}.json")) temp_idedata = Path(CORE.relative_internal_path("idedata", f"{CORE.name}.json"))
changed = False changed = False
if not platformio_ini.is_file() or not temp_idedata.is_file(): if (
changed = True not platformio_ini.is_file()
elif platformio_ini.stat().st_mtime >= temp_idedata.stat().st_mtime: or not temp_idedata.is_file()
or platformio_ini.stat().st_mtime >= temp_idedata.stat().st_mtime
):
changed = True changed = True
if not changed: if not changed:

View File

@ -59,7 +59,7 @@ def safe_print(message="", end="\n"):
from esphome.core import CORE from esphome.core import CORE
if CORE.dashboard: if CORE.dashboard:
try: try: # noqa: SIM105
message = message.replace("\033", "\\033") message = message.replace("\033", "\\033")
except UnicodeEncodeError: except UnicodeEncodeError:
pass pass

View File

@ -116,10 +116,7 @@ def wizard_file(**kwargs):
kwargs["fallback_name"] = ap_name kwargs["fallback_name"] = ap_name
kwargs["fallback_psk"] = "".join(random.choice(letters) for _ in range(12)) kwargs["fallback_psk"] = "".join(random.choice(letters) for _ in range(12))
if kwargs.get("friendly_name"): base = BASE_CONFIG_FRIENDLY if kwargs.get("friendly_name") else BASE_CONFIG
base = BASE_CONFIG_FRIENDLY
else:
base = BASE_CONFIG
config = base.format(**kwargs) config = base.format(**kwargs)

View File

@ -86,21 +86,17 @@ def storage_should_clean(old: StorageJSON, new: StorageJSON) -> bool:
if old.src_version != new.src_version: if old.src_version != new.src_version:
return True return True
if old.build_path != new.build_path: return old.build_path != new.build_path
return True
return False
def storage_should_update_cmake_cache(old: StorageJSON, new: StorageJSON) -> bool: def storage_should_update_cmake_cache(old: StorageJSON, new: StorageJSON) -> bool:
if ( if (
old.loaded_integrations != new.loaded_integrations old.loaded_integrations != new.loaded_integrations
or old.loaded_platforms != new.loaded_platforms or old.loaded_platforms != new.loaded_platforms
): ) and new.core_platform == PLATFORM_ESP32:
if new.core_platform == PLATFORM_ESP32: from esphome.components.esp32 import FRAMEWORK_ESP_IDF
from esphome.components.esp32 import FRAMEWORK_ESP_IDF
return new.framework == FRAMEWORK_ESP_IDF return new.framework == FRAMEWORK_ESP_IDF
return False return False

View File

@ -56,9 +56,12 @@ class ESPHomeDataBase:
def from_node(self, node): def from_node(self, node):
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
self._esp_range = DocumentRange.from_marks(node.start_mark, node.end_mark) self._esp_range = DocumentRange.from_marks(node.start_mark, node.end_mark)
if isinstance(node, yaml.ScalarNode): if (
if node.style is not None and node.style in "|>": isinstance(node, yaml.ScalarNode)
self._content_offset = 1 and node.style is not None
and node.style in "|>"
):
self._content_offset = 1
def from_database(self, database): def from_database(self, database):
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init

View File

@ -111,11 +111,12 @@ exclude = ['generated']
[tool.ruff.lint] [tool.ruff.lint]
select = [ select = [
"E", # pycodestyle "E", # pycodestyle
"F", # pyflakes/autoflake "F", # pyflakes/autoflake
"I", # isort "I", # isort
"PL", # pylint "PL", # pylint
"UP", # pyupgrade "SIM", # flake8-simplify
"UP", # pyupgrade
] ]
ignore = [ ignore = [

View File

@ -61,9 +61,7 @@ def indent_list(text: str, padding: str = " ") -> list[str]:
"""Indent each line of the given text with the specified padding.""" """Indent each line of the given text with the specified padding."""
lines = [] lines = []
for line in text.splitlines(): for line in text.splitlines():
if line == "": if line == "" or line.startswith("#ifdef") or line.startswith("#endif"):
p = ""
elif line.startswith("#ifdef") or line.startswith("#endif"):
p = "" p = ""
else: else:
p = padding p = padding
@ -2385,7 +2383,7 @@ static const char *const TAG = "api.service";
needs_conn = get_opt(m, pb.needs_setup_connection, True) needs_conn = get_opt(m, pb.needs_setup_connection, True)
needs_auth = get_opt(m, pb.needs_authentication, True) needs_auth = get_opt(m, pb.needs_authentication, True)
ifdef = message_ifdef_map.get(inp, ifdefs.get(inp, None)) ifdef = message_ifdef_map.get(inp, ifdefs.get(inp))
if ifdef is not None: if ifdef is not None:
hpp += f"#ifdef {ifdef}\n" hpp += f"#ifdef {ifdef}\n"

View File

@ -71,11 +71,13 @@ def get_component_names():
skip_components = [] skip_components = []
for d in os.listdir(CORE_COMPONENTS_PATH): for d in os.listdir(CORE_COMPONENTS_PATH):
if not d.startswith("__") and os.path.isdir( if (
os.path.join(CORE_COMPONENTS_PATH, d) not d.startswith("__")
and os.path.isdir(os.path.join(CORE_COMPONENTS_PATH, d))
and d not in component_names
and d not in skip_components
): ):
if d not in component_names and d not in skip_components: component_names.append(d)
component_names.append(d)
return sorted(component_names) return sorted(component_names)
@ -139,11 +141,10 @@ def register_module_schemas(key, module, manifest=None):
for name, schema in module_schemas(module): for name, schema in module_schemas(module):
register_known_schema(key, name, schema) register_known_schema(key, name, schema)
if manifest: if manifest and manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]:
# Multi conf should allow list of components # Multi conf should allow list of components
# not sure about 2nd part of the if, might be useless config (e.g. as3935) # not sure about 2nd part of the if, might be useless config (e.g. as3935)
if manifest.multi_conf and S_CONFIG_SCHEMA in output[key][S_SCHEMAS]: output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True
output[key][S_SCHEMAS][S_CONFIG_SCHEMA]["is_list"] = True
def register_known_schema(module, name, schema): def register_known_schema(module, name, schema):
@ -230,7 +231,7 @@ def add_module_registries(domain, module):
reg_type = attr_name.partition("_")[0].lower() reg_type = attr_name.partition("_")[0].lower()
found_registries[repr(attr_obj)] = f"{domain}.{reg_type}" found_registries[repr(attr_obj)] = f"{domain}.{reg_type}"
for name in attr_obj.keys(): for name in attr_obj:
if "." not in name: if "." not in name:
reg_entry_name = name reg_entry_name = name
else: else:
@ -700,7 +701,7 @@ def is_convertible_schema(schema):
if repr(schema) in ejs.registry_schemas: if repr(schema) in ejs.registry_schemas:
return True return True
if isinstance(schema, dict): if isinstance(schema, dict):
for k in schema.keys(): for k in schema:
if isinstance(k, (cv.Required, cv.Optional)): if isinstance(k, (cv.Required, cv.Optional)):
return True return True
return False return False
@ -818,7 +819,7 @@ def convert(schema, config_var, path):
elif schema_type == "automation": elif schema_type == "automation":
extra_schema = None extra_schema = None
config_var[S_TYPE] = "trigger" config_var[S_TYPE] = "trigger"
if automation.AUTOMATION_SCHEMA == ejs.extended_schemas[repr(data)][0]: if ejs.extended_schemas[repr(data)][0] == automation.AUTOMATION_SCHEMA:
extra_schema = ejs.extended_schemas[repr(data)][1] extra_schema = ejs.extended_schemas[repr(data)][1]
if ( if (
extra_schema is not None and len(extra_schema) > 1 extra_schema is not None and len(extra_schema) > 1
@ -926,9 +927,8 @@ def convert(schema, config_var, path):
config = convert_config(schema_type, path + "/type_" + schema_key) config = convert_config(schema_type, path + "/type_" + schema_key)
types[schema_key] = config["schema"] types[schema_key] = config["schema"]
elif DUMP_UNKNOWN: elif DUMP_UNKNOWN and S_TYPE not in config_var:
if S_TYPE not in config_var: config_var["unknown"] = repr_schema
config_var["unknown"] = repr_schema
if DUMP_PATH: if DUMP_PATH:
config_var["path"] = path config_var["path"] = path

View File

@ -365,9 +365,11 @@ def load_idedata(environment: str) -> dict[str, Any]:
platformio_ini = Path(root_path) / "platformio.ini" platformio_ini = Path(root_path) / "platformio.ini"
temp_idedata = Path(temp_folder) / f"idedata-{environment}.json" temp_idedata = Path(temp_folder) / f"idedata-{environment}.json"
changed = False changed = False
if not platformio_ini.is_file() or not temp_idedata.is_file(): if (
changed = True not platformio_ini.is_file()
elif platformio_ini.stat().st_mtime >= temp_idedata.stat().st_mtime: or not temp_idedata.is_file()
or platformio_ini.stat().st_mtime >= temp_idedata.stat().st_mtime
):
changed = True changed = True
if "idf" in environment: if "idf" in environment:

View File

@ -183,19 +183,17 @@ async def yaml_config(request: pytest.FixtureRequest, unused_tcp_port: int) -> s
content = content.replace("api:", f"api:\n port: {unused_tcp_port}") content = content.replace("api:", f"api:\n port: {unused_tcp_port}")
# Add debug build flags for integration tests to enable assertions # Add debug build flags for integration tests to enable assertions
if "esphome:" in content: if "esphome:" in content and "platformio_options:" not in content:
# Check if platformio_options already exists # Add platformio_options with debug flags after esphome:
if "platformio_options:" not in content: content = content.replace(
# Add platformio_options with debug flags after esphome: "esphome:",
content = content.replace( "esphome:\n"
"esphome:", " # Enable assertions for integration tests\n"
"esphome:\n" " platformio_options:\n"
" # Enable assertions for integration tests\n" " build_flags:\n"
" platformio_options:\n" ' - "-DDEBUG" # Enable assert() statements\n'
" build_flags:\n" ' - "-g" # Add debug symbols',
' - "-DDEBUG" # Enable assert() statements\n' )
' - "-g" # Add debug symbols',
)
return content return content

View File

@ -59,86 +59,86 @@ async def test_api_custom_services(
custom_arrays_future.set_result(True) custom_arrays_future.set_result(True)
# Run with log monitoring # Run with log monitoring
async with run_compiled(yaml_config, line_callback=check_output): async with (
async with api_client_connected() as client: run_compiled(yaml_config, line_callback=check_output),
# Verify device info api_client_connected() as client,
device_info = await client.device_info() ):
assert device_info is not None # Verify device info
assert device_info.name == "api-custom-services-test" device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "api-custom-services-test"
# List services # List services
_, services = await client.list_entities_services() _, services = await client.list_entities_services()
# Should have 4 services: 1 YAML + 3 CustomAPIDevice # Should have 4 services: 1 YAML + 3 CustomAPIDevice
assert len(services) == 4, f"Expected 4 services, found {len(services)}" assert len(services) == 4, f"Expected 4 services, found {len(services)}"
# Find our services # Find our services
yaml_service: UserService | None = None yaml_service: UserService | None = None
custom_service: UserService | None = None custom_service: UserService | None = None
custom_args_service: UserService | None = None custom_args_service: UserService | None = None
custom_arrays_service: UserService | None = None custom_arrays_service: UserService | None = None
for service in services: for service in services:
if service.name == "test_yaml_service": if service.name == "test_yaml_service":
yaml_service = service yaml_service = service
elif service.name == "custom_test_service": elif service.name == "custom_test_service":
custom_service = service custom_service = service
elif service.name == "custom_service_with_args": elif service.name == "custom_service_with_args":
custom_args_service = service custom_args_service = service
elif service.name == "custom_service_with_arrays": elif service.name == "custom_service_with_arrays":
custom_arrays_service = service custom_arrays_service = service
assert yaml_service is not None, "test_yaml_service not found" assert yaml_service is not None, "test_yaml_service not found"
assert custom_service is not None, "custom_test_service not found" assert custom_service is not None, "custom_test_service not found"
assert custom_args_service is not None, "custom_service_with_args not found" assert custom_args_service is not None, "custom_service_with_args not found"
assert custom_arrays_service is not None, ( assert custom_arrays_service is not None, "custom_service_with_arrays not found"
"custom_service_with_arrays not found"
)
# Test YAML service # Test YAML service
client.execute_service(yaml_service, {}) client.execute_service(yaml_service, {})
await asyncio.wait_for(yaml_service_future, timeout=5.0) await asyncio.wait_for(yaml_service_future, timeout=5.0)
# Test simple CustomAPIDevice service # Test simple CustomAPIDevice service
client.execute_service(custom_service, {}) client.execute_service(custom_service, {})
await asyncio.wait_for(custom_service_future, timeout=5.0) await asyncio.wait_for(custom_service_future, timeout=5.0)
# Verify custom_args_service arguments # Verify custom_args_service arguments
assert len(custom_args_service.args) == 4 assert len(custom_args_service.args) == 4
arg_types = {arg.name: arg.type for arg in custom_args_service.args} arg_types = {arg.name: arg.type for arg in custom_args_service.args}
assert arg_types["arg_string"] == UserServiceArgType.STRING assert arg_types["arg_string"] == UserServiceArgType.STRING
assert arg_types["arg_int"] == UserServiceArgType.INT assert arg_types["arg_int"] == UserServiceArgType.INT
assert arg_types["arg_bool"] == UserServiceArgType.BOOL assert arg_types["arg_bool"] == UserServiceArgType.BOOL
assert arg_types["arg_float"] == UserServiceArgType.FLOAT assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Test CustomAPIDevice service with arguments # Test CustomAPIDevice service with arguments
client.execute_service( client.execute_service(
custom_args_service, custom_args_service,
{ {
"arg_string": "test_string", "arg_string": "test_string",
"arg_int": 456, "arg_int": 456,
"arg_bool": True, "arg_bool": True,
"arg_float": 78.9, "arg_float": 78.9,
}, },
) )
await asyncio.wait_for(custom_args_future, timeout=5.0) await asyncio.wait_for(custom_args_future, timeout=5.0)
# Verify array service arguments # Verify array service arguments
assert len(custom_arrays_service.args) == 4 assert len(custom_arrays_service.args) == 4
array_arg_types = {arg.name: arg.type for arg in custom_arrays_service.args} array_arg_types = {arg.name: arg.type for arg in custom_arrays_service.args}
assert array_arg_types["bool_array"] == UserServiceArgType.BOOL_ARRAY assert array_arg_types["bool_array"] == UserServiceArgType.BOOL_ARRAY
assert array_arg_types["int_array"] == UserServiceArgType.INT_ARRAY assert array_arg_types["int_array"] == UserServiceArgType.INT_ARRAY
assert array_arg_types["float_array"] == UserServiceArgType.FLOAT_ARRAY assert array_arg_types["float_array"] == UserServiceArgType.FLOAT_ARRAY
assert array_arg_types["string_array"] == UserServiceArgType.STRING_ARRAY assert array_arg_types["string_array"] == UserServiceArgType.STRING_ARRAY
# Test CustomAPIDevice service with arrays # Test CustomAPIDevice service with arrays
client.execute_service( client.execute_service(
custom_arrays_service, custom_arrays_service,
{ {
"bool_array": [True, False], "bool_array": [True, False],
"int_array": [1, 2, 3], "int_array": [1, 2, 3],
"float_array": [1.1, 2.2], "float_array": [1.1, 2.2],
"string_array": ["hello", "world"], "string_array": ["hello", "world"],
}, },
) )
await asyncio.wait_for(custom_arrays_future, timeout=5.0) await asyncio.wait_for(custom_arrays_future, timeout=5.0)

View File

@ -47,9 +47,7 @@ async def test_device_id_in_state(
entity_device_mapping[entity.key] = device_ids["Humidity Monitor"] entity_device_mapping[entity.key] = device_ids["Humidity Monitor"]
elif entity.name == "Motion Detected": elif entity.name == "Motion Detected":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]
elif entity.name == "Temperature Monitor Power": elif entity.name in {"Temperature Monitor Power", "Temperature Status"}:
entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
elif entity.name == "Temperature Status":
entity_device_mapping[entity.key] = device_ids["Temperature Monitor"] entity_device_mapping[entity.key] = device_ids["Temperature Monitor"]
elif entity.name == "Motion Light": elif entity.name == "Motion Light":
entity_device_mapping[entity.key] = device_ids["Motion Sensor"] entity_device_mapping[entity.key] = device_ids["Motion Sensor"]

View File

@ -70,11 +70,13 @@ async def test_scheduler_defer_cancel(
test_complete_future.set_result(True) test_complete_future.set_result(True)
return return
if state.key == test_result_entity.key and not test_result_future.done(): if (
# Event type should be "defer_executed_X" where X is the defer number state.key == test_result_entity.key
if state.event_type.startswith("defer_executed_"): and not test_result_future.done()
defer_num = int(state.event_type.split("_")[-1]) and state.event_type.startswith("defer_executed_")
test_result_future.set_result(defer_num) ):
defer_num = int(state.event_type.split("_")[-1])
test_result_future.set_result(defer_num)
client.subscribe_states(on_state) client.subscribe_states(on_state)

View File

@ -27,33 +27,33 @@ async def test_scheduler_null_name(
if not test_complete_future.done() and test_complete_pattern.search(line): if not test_complete_future.done() and test_complete_pattern.search(line):
test_complete_future.set_result(True) test_complete_future.set_result(True)
async with run_compiled(yaml_config, line_callback=check_output): async with (
async with api_client_connected() as client: run_compiled(yaml_config, line_callback=check_output),
# Verify we can connect api_client_connected() as client,
device_info = await client.device_info() ):
assert device_info is not None # Verify we can connect
assert device_info.name == "scheduler-null-name" device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "scheduler-null-name"
# List services # List services
_, services = await asyncio.wait_for( _, services = await asyncio.wait_for(
client.list_entities_services(), timeout=5.0 client.list_entities_services(), timeout=5.0
)
# Find our test service
test_null_name_service = next(
(s for s in services if s.name == "test_null_name"), None
)
assert test_null_name_service is not None, "test_null_name service not found"
# Execute the test
client.execute_service(test_null_name_service, {})
# Wait for test completion
try:
await asyncio.wait_for(test_complete_future, timeout=10.0)
except TimeoutError:
pytest.fail(
"Test did not complete within timeout - likely crashed due to NULL name"
) )
# Find our test service
test_null_name_service = next(
(s for s in services if s.name == "test_null_name"), None
)
assert test_null_name_service is not None, (
"test_null_name service not found"
)
# Execute the test
client.execute_service(test_null_name_service, {})
# Wait for test completion
try:
await asyncio.wait_for(test_complete_future, timeout=10.0)
except TimeoutError:
pytest.fail(
"Test did not complete within timeout - likely crashed due to NULL name"
)

View File

@ -61,9 +61,10 @@ async def test_scheduler_rapid_cancellation(
elif "Total executed:" in line: elif "Total executed:" in line:
if match := re.search(r"Total executed: (\d+)", line): if match := re.search(r"Total executed: (\d+)", line):
test_stats["final_executed"] = int(match.group(1)) test_stats["final_executed"] = int(match.group(1))
elif "Implicit cancellations (replaced):" in line: elif "Implicit cancellations (replaced):" in line and (
if match := re.search(r"Implicit cancellations \(replaced\): (\d+)", line): match := re.search(r"Implicit cancellations \(replaced\): (\d+)", line)
test_stats["final_implicit_cancellations"] = int(match.group(1)) ):
test_stats["final_implicit_cancellations"] = int(match.group(1))
# Check for crash indicators # Check for crash indicators
if any( if any(

View File

@ -146,9 +146,11 @@ def test_main_list_components_fails(
mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, "cmd") mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, "cmd")
# Run main function with mocked argv - should raise # Run main function with mocked argv - should raise
with patch("sys.argv", ["determine-jobs.py"]): with (
with pytest.raises(subprocess.CalledProcessError): patch("sys.argv", ["determine-jobs.py"]),
determine_jobs.main() pytest.raises(subprocess.CalledProcessError),
):
determine_jobs.main()
def test_main_with_branch_argument( def test_main_with_branch_argument(
@ -243,17 +245,21 @@ def test_should_run_integration_tests_with_branch() -> None:
def test_should_run_integration_tests_component_dependency() -> None: def test_should_run_integration_tests_component_dependency() -> None:
"""Test that integration tests run when components used in fixtures change.""" """Test that integration tests run when components used in fixtures change."""
with patch.object( with (
determine_jobs, "changed_files", return_value=["esphome/components/api/api.cpp"] patch.object(
): determine_jobs,
with patch.object( "changed_files",
return_value=["esphome/components/api/api.cpp"],
),
patch.object(
determine_jobs, "get_components_from_integration_fixtures" determine_jobs, "get_components_from_integration_fixtures"
) as mock_fixtures: ) as mock_fixtures,
mock_fixtures.return_value = {"api", "sensor"} ):
with patch.object(determine_jobs, "get_all_dependencies") as mock_deps: mock_fixtures.return_value = {"api", "sensor"}
mock_deps.return_value = {"api", "sensor", "network"} with patch.object(determine_jobs, "get_all_dependencies") as mock_deps:
result = determine_jobs.should_run_integration_tests() mock_deps.return_value = {"api", "sensor", "network"}
assert result is True result = determine_jobs.should_run_integration_tests()
assert result is True
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -272,12 +278,14 @@ def test_should_run_clang_tidy(
expected_result: bool, expected_result: bool,
) -> None: ) -> None:
"""Test should_run_clang_tidy function.""" """Test should_run_clang_tidy function."""
with patch.object(determine_jobs, "changed_files", return_value=changed_files): with (
patch.object(determine_jobs, "changed_files", return_value=changed_files),
patch("subprocess.run") as mock_run,
):
# Test with hash check returning specific code # Test with hash check returning specific code
with patch("subprocess.run") as mock_run: mock_run.return_value = Mock(returncode=check_returncode)
mock_run.return_value = Mock(returncode=check_returncode) result = determine_jobs.should_run_clang_tidy()
result = determine_jobs.should_run_clang_tidy() assert result == expected_result
assert result == expected_result
def test_should_run_clang_tidy_hash_check_exception() -> None: def test_should_run_clang_tidy_hash_check_exception() -> None: