[api] Fix string lifetime issue in Home Assistant service calls with templated values (#9909)

This commit is contained in:
J. Nick Koston 2025-07-27 18:39:25 -10:00 committed by GitHub
parent 05f6d01cbe
commit 1702356fc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 670 additions and 25 deletions

View File

@ -760,7 +760,7 @@ message SubscribeHomeassistantServicesRequest {
message HomeassistantServiceMap {
string key = 1;
string value = 2;
string value = 2 [(no_zero_copy) = true];
}
message HomeassistantServiceResponse {

View File

@ -27,4 +27,5 @@ extend google.protobuf.MessageOptions {
extend google.protobuf.FieldOptions {
optional string field_ifdef = 1042;
optional uint32 fixed_array_size = 50007;
optional bool no_zero_copy = 50008 [default=false];
}

View File

@ -846,11 +846,11 @@ void NoiseEncryptionSetKeyResponse::calculate_size(uint32_t &total_size) const {
#ifdef USE_API_HOMEASSISTANT_SERVICES
void HomeassistantServiceMap::encode(ProtoWriteBuffer buffer) const {
buffer.encode_string(1, this->key_ref_);
buffer.encode_string(2, this->value_ref_);
buffer.encode_string(2, this->value);
}
void HomeassistantServiceMap::calculate_size(uint32_t &total_size) const {
ProtoSize::add_string_field(total_size, 1, this->key_ref_.size());
ProtoSize::add_string_field(total_size, 1, this->value_ref_.size());
ProtoSize::add_string_field(total_size, 1, this->value.size());
}
void HomeassistantServiceResponse::encode(ProtoWriteBuffer buffer) const {
buffer.encode_string(1, this->service_ref_);

View File

@ -1062,8 +1062,7 @@ class HomeassistantServiceMap : public ProtoMessage {
public:
StringRef key_ref_{};
void set_key(const StringRef &ref) { this->key_ref_ = ref; }
StringRef value_ref_{};
void set_value(const StringRef &ref) { this->value_ref_ = ref; }
std::string value{};
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(uint32_t &total_size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP

View File

@ -1045,7 +1045,7 @@ void SubscribeHomeassistantServicesRequest::dump_to(std::string &out) const {
void HomeassistantServiceMap::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "HomeassistantServiceMap");
dump_field(out, "key", this->key_ref_);
dump_field(out, "value", this->value_ref_);
dump_field(out, "value", this->value);
}
void HomeassistantServiceResponse::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "HomeassistantServiceResponse");

View File

@ -175,7 +175,7 @@ class CustomAPIDevice {
resp.data.emplace_back();
auto &kv = resp.data.back();
kv.set_key(StringRef(it.first));
kv.set_value(StringRef(it.second));
kv.value = it.second;
}
global_api_server->send_homeassistant_service_call(resp);
}
@ -218,7 +218,7 @@ class CustomAPIDevice {
resp.data.emplace_back();
auto &kv = resp.data.back();
kv.set_key(StringRef(it.first));
kv.set_value(StringRef(it.second));
kv.value = it.second;
}
global_api_server->send_homeassistant_service_call(resp);
}

View File

@ -70,22 +70,19 @@ template<typename... Ts> class HomeAssistantServiceCallAction : public Action<Ts
resp.data.emplace_back();
auto &kv = resp.data.back();
kv.set_key(StringRef(it.key));
std::string value = it.value.value(x...);
kv.set_value(StringRef(value));
kv.value = it.value.value(x...);
}
for (auto &it : this->data_template_) {
resp.data_template.emplace_back();
auto &kv = resp.data_template.back();
kv.set_key(StringRef(it.key));
std::string value = it.value.value(x...);
kv.set_value(StringRef(value));
kv.value = it.value.value(x...);
}
for (auto &it : this->variables_) {
resp.variables.emplace_back();
auto &kv = resp.variables.back();
kv.set_key(StringRef(it.key));
std::string value = it.value.value(x...);
kv.set_value(StringRef(value));
kv.value = it.value.value(x...);
}
this->parent_->send_homeassistant_service_call(resp);
}

View File

@ -93,14 +93,12 @@ void HomeassistantNumber::control(float value) {
resp.data.emplace_back();
auto &entity_id = resp.data.back();
entity_id.set_key(ENTITY_ID_KEY);
entity_id.set_value(StringRef(this->entity_id_));
entity_id.value = this->entity_id_;
resp.data.emplace_back();
auto &entity_value = resp.data.back();
entity_value.set_key(VALUE_KEY);
// to_string() returns a temporary - must store it to avoid dangling reference
std::string value_str = to_string(value);
entity_value.set_value(StringRef(value_str));
entity_value.value = to_string(value);
api::global_api_server->send_homeassistant_service_call(resp);
}

View File

@ -54,7 +54,7 @@ void HomeassistantSwitch::write_state(bool state) {
resp.data.emplace_back();
auto &entity_id_kv = resp.data.back();
entity_id_kv.set_key(ENTITY_ID_KEY);
entity_id_kv.set_value(StringRef(this->entity_id_));
entity_id_kv.value = this->entity_id_;
api::global_api_server->send_homeassistant_service_call(resp);
}

View File

@ -562,11 +562,16 @@ class StringType(TypeInfo):
@property
def public_content(self) -> list[str]:
content: list[str] = []
# Add std::string storage if message needs decoding
if self._needs_decode:
# Check if no_zero_copy option is set
no_zero_copy = get_field_opt(self._field, pb.no_zero_copy, False)
# Add std::string storage if message needs decoding OR if no_zero_copy is set
if self._needs_decode or no_zero_copy:
content.append(f"std::string {self.field_name}{{}};")
if self._needs_encode:
# Only add StringRef if encoding is needed AND no_zero_copy is not set
if self._needs_encode and not no_zero_copy:
content.extend(
[
# Add StringRef field if message needs encoding
@ -581,13 +586,28 @@ class StringType(TypeInfo):
@property
def encode_content(self) -> str:
return f"buffer.encode_string({self.number}, this->{self.field_name}_ref_);"
# Check if no_zero_copy option is set
no_zero_copy = get_field_opt(self._field, pb.no_zero_copy, False)
if no_zero_copy:
# Use the std::string directly
return f"buffer.encode_string({self.number}, this->{self.field_name});"
else:
# Use the StringRef
return f"buffer.encode_string({self.number}, this->{self.field_name}_ref_);"
def dump(self, name):
# Check if no_zero_copy option is set
no_zero_copy = get_field_opt(self._field, pb.no_zero_copy, False)
# If name is 'it', this is a repeated field element - always use string
if name == "it":
return "append_quoted_string(out, StringRef(it));"
# If no_zero_copy is set, always use std::string
if no_zero_copy:
return f'out.append("\'").append(this->{self.field_name}).append("\'");'
# For SOURCE_CLIENT only, always use std::string
if not self._needs_encode:
return f'out.append("\'").append(this->{self.field_name}).append("\'");'
@ -607,6 +627,13 @@ class StringType(TypeInfo):
@property
def dump_content(self) -> str:
# Check if no_zero_copy option is set
no_zero_copy = get_field_opt(self._field, pb.no_zero_copy, False)
# If no_zero_copy is set, always use std::string
if no_zero_copy:
return f'dump_field(out, "{self.name}", this->{self.field_name});'
# For SOURCE_CLIENT only, use std::string
if not self._needs_encode:
return f'dump_field(out, "{self.name}", this->{self.field_name});'
@ -622,8 +649,15 @@ class StringType(TypeInfo):
return o
def get_size_calculation(self, name: str, force: bool = False) -> str:
# For SOURCE_CLIENT only messages, use the string field directly
if not self._needs_encode:
# Check if no_zero_copy option is set
no_zero_copy = get_field_opt(self._field, pb.no_zero_copy, False)
# For SOURCE_CLIENT only messages or no_zero_copy, use the string field directly
if not self._needs_encode or no_zero_copy:
# For no_zero_copy, we need to use .size() on the string
if no_zero_copy and name != "it":
field_id_size = self.calculate_field_id_size()
return f"ProtoSize::add_string_field(total_size, {field_id_size}, this->{self.field_name}.size());"
return self._get_simple_size_calculation(name, force, "add_string_field")
# Check if this is being called from a repeated field context

View File

@ -0,0 +1,311 @@
esphome:
name: test-ha-api
friendly_name: Home Assistant API Test
host:
api:
services:
- service: trigger_all_tests
then:
- logger.log: "=== Starting Home Assistant API Tests ==="
- button.press: test_basic_service
- button.press: test_templated_service
- button.press: test_empty_string_service
- button.press: test_multiple_fields_service
- button.press: test_complex_lambda_service
- button.press: test_all_empty_service
- button.press: test_rapid_service_calls
- button.press: test_read_ha_states
- number.set:
id: ha_number
value: 42.5
- switch.turn_on: ha_switch
- switch.turn_off: ha_switch
- logger.log: "=== All tests completed ==="
logger:
level: DEBUG
# Time component for templated values
time:
- platform: homeassistant
id: homeassistant_time
# Global variables for testing
globals:
- id: test_brightness
type: int
initial_value: '75'
- id: test_string
type: std::string
initial_value: '"test_value"'
# Sensors for testing state reading
sensor:
- platform: template
name: "Test Sensor"
id: test_sensor
lambda: return 42.0;
update_interval: 0.1s
# Home Assistant sensor that reads external state
- platform: homeassistant
name: "HA Temperature"
entity_id: sensor.external_temperature
id: ha_temperature
on_value:
then:
- logger.log:
format: "HA Temperature state updated: %.1f"
args: ['x']
# Test multiple HA state sensors
- platform: homeassistant
name: "HA Humidity"
entity_id: sensor.external_humidity
id: ha_humidity
on_value:
then:
- logger.log:
format: "HA Humidity state updated: %.1f"
args: ['x']
# Binary sensor from Home Assistant
binary_sensor:
- platform: homeassistant
name: "HA Motion"
entity_id: binary_sensor.external_motion
id: ha_motion
on_state:
then:
- logger.log:
format: "HA Motion state changed: %s"
args: ['x ? "ON" : "OFF"']
# Text sensor from Home Assistant
text_sensor:
- platform: homeassistant
name: "HA Weather"
entity_id: weather.home
attribute: condition
id: ha_weather
on_value:
then:
- logger.log:
format: "HA Weather condition updated: %s"
args: ['x.c_str()']
# Test empty state handling
- platform: homeassistant
name: "HA Empty State"
entity_id: sensor.nonexistent_sensor
id: ha_empty_state
on_value:
then:
- logger.log:
format: "HA Empty state updated: %s"
args: ['x.c_str()']
# Number component for testing HA number control
number:
- platform: template
name: "HA Controlled Number"
id: ha_number
min_value: 0
max_value: 100
step: 1
optimistic: true
set_action:
- logger.log:
format: "Setting HA number to: %.1f"
args: ['x']
- homeassistant.action:
action: input_number.set_value
data:
entity_id: input_number.test_number
value: !lambda 'return to_string(x);'
# Switch component for testing HA switch control
switch:
- platform: template
name: "HA Controlled Switch"
id: ha_switch
optimistic: true
turn_on_action:
- logger.log: "Toggling HA switch: switch.test_switch ON"
- homeassistant.action:
action: switch.turn_on
data:
entity_id: switch.test_switch
turn_off_action:
- logger.log: "Toggling HA switch: switch.test_switch OFF"
- homeassistant.action:
action: switch.turn_off
data:
entity_id: switch.test_switch
# Buttons for testing various service call scenarios
button:
# Test 1: Basic service call with static values
- platform: template
name: "Test Basic Service"
id: test_basic_service
on_press:
- logger.log: "Sending HomeAssistant service call: light.turn_off"
- homeassistant.action:
action: light.turn_off
data:
entity_id: light.test_light
- logger.log: "Service data: entity_id=light.test_light"
# Test 2: Service call with templated/lambda values (main bug fix test)
- platform: template
name: "Test Templated Service"
id: test_templated_service
on_press:
- logger.log: "Testing templated service call"
- lambda: |-
int brightness_percent = id(test_brightness);
std::string computed = to_string(brightness_percent * 255 / 100);
ESP_LOGI("test", "Lambda computed value: %s", computed.c_str());
- homeassistant.action:
action: light.turn_on
data:
entity_id: light.test_light
# This creates a temporary string - the main test case
brightness: !lambda 'return to_string(id(test_brightness) * 255 / 100);'
data_template:
color_name: !lambda 'return id(test_string);'
variables:
transition: !lambda 'return "2.5";'
# Test 3: Service call with empty string values
- platform: template
name: "Test Empty String Service"
id: test_empty_string_service
on_press:
- logger.log: "Testing empty string values"
- homeassistant.action:
action: notify.test
data:
message: "Test message"
title: ""
data_template:
target: !lambda 'return "";'
variables:
sound: !lambda 'return "";'
- logger.log: "Empty value for key: title"
- logger.log: "Empty value for key: target"
- logger.log: "Empty value for key: sound"
# Test 4: Service call with multiple data fields
- platform: template
name: "Test Multiple Fields Service"
id: test_multiple_fields_service
on_press:
- logger.log: "Testing multiple data fields"
- homeassistant.action:
action: climate.set_temperature
data:
entity_id: climate.test_climate
temperature: "22"
hvac_mode: "heat"
data_template:
target_temp_high: !lambda 'return "24";'
target_temp_low: !lambda 'return "20";'
variables:
preset_mode: !lambda 'return "comfort";'
# Test 5: Complex lambda with string operations
- platform: template
name: "Test Complex Lambda Service"
id: test_complex_lambda_service
on_press:
- logger.log: "Testing complex lambda expressions"
- homeassistant.action:
action: script.test_script
data:
entity_id: !lambda |-
std::string base = "light.";
std::string room = "living_room";
return base + room;
brightness_pct: !lambda |-
float sensor_val = id(test_sensor).state;
int pct = (int)(sensor_val * 2.38); // 42 * 2.38 ≈ 100
return to_string(pct);
data_template:
message: !lambda |-
char buffer[50];
snprintf(buffer, sizeof(buffer), "Sensor: %.1f, Time: %02d:%02d",
id(test_sensor).state,
id(homeassistant_time).now().hour,
id(homeassistant_time).now().minute);
return std::string(buffer);
# Test 6: Service with only empty strings to verify size calculation
- platform: template
name: "Test All Empty Service"
id: test_all_empty_service
on_press:
- logger.log: "Testing all empty string values"
- homeassistant.action:
action: test.empty
data:
field1: ""
field2: ""
data_template:
field3: !lambda 'return "";'
variables:
field4: !lambda 'return "";'
- logger.log: "All empty service call completed"
# Test 7: Rapid successive service calls
- platform: template
name: "Test Rapid Service Calls"
id: test_rapid_service_calls
on_press:
- logger.log: "Testing rapid service calls"
- repeat:
count: 5
then:
- homeassistant.action:
action: counter.increment
data:
entity_id: counter.test_counter
- delay: 10ms
- logger.log: "Rapid service calls completed"
# Test 8: Log current HA states
- platform: template
name: "Test Read HA States"
id: test_read_ha_states
on_press:
- logger.log: "Reading current HA states"
- lambda: |-
if (id(ha_temperature).has_state()) {
ESP_LOGI("test", "Current HA Temperature: %.1f", id(ha_temperature).state);
} else {
ESP_LOGI("test", "HA Temperature has no state");
}
if (id(ha_humidity).has_state()) {
ESP_LOGI("test", "Current HA Humidity: %.1f", id(ha_humidity).state);
} else {
ESP_LOGI("test", "HA Humidity has no state");
}
ESP_LOGI("test", "Current HA Motion: %s", id(ha_motion).state ? "ON" : "OFF");
if (id(ha_weather).has_state()) {
ESP_LOGI("test", "Current HA Weather: %s", id(ha_weather).state.c_str());
} else {
ESP_LOGI("test", "HA Weather has no state");
}
if (id(ha_empty_state).has_state()) {
ESP_LOGI("test", "HA Empty State value: %s", id(ha_empty_state).state.c_str());
} else {
ESP_LOGI("test", "HA Empty State has no value (expected)");
}

View File

@ -0,0 +1,305 @@
"""Integration test for Home Assistant API functionality.
Tests:
- Home Assistant service calls with templated values (main bug fix)
- Service calls with empty string values
- Home Assistant state reading (sensors, binary sensors, text sensors)
- Home Assistant number and switch component control
- Complex lambda expressions and string handling
"""
from __future__ import annotations
import asyncio
import re
from aioesphomeapi import HomeassistantServiceCall
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_api_homeassistant(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Comprehensive test for Home Assistant API functionality."""
loop = asyncio.get_running_loop()
# Create futures for patterns that capture values
lambda_computed_future = loop.create_future()
ha_temp_state_future = loop.create_future()
ha_humidity_state_future = loop.create_future()
ha_motion_state_future = loop.create_future()
ha_weather_state_future = loop.create_future()
# State update futures
temp_update_future = loop.create_future()
humidity_update_future = loop.create_future()
motion_update_future = loop.create_future()
weather_update_future = loop.create_future()
# Number future
ha_number_future = loop.create_future()
tests_complete_future = loop.create_future()
# Patterns to match in logs - only keeping patterns that capture values
lambda_computed_pattern = re.compile(r"Lambda computed value: (\d+)")
ha_temp_state_pattern = re.compile(r"Current HA Temperature: ([\d.]+)")
ha_humidity_state_pattern = re.compile(r"Current HA Humidity: ([\d.]+)")
ha_motion_state_pattern = re.compile(r"Current HA Motion: (ON|OFF)")
ha_weather_state_pattern = re.compile(r"Current HA Weather: (\w+)")
# State update patterns
temp_update_pattern = re.compile(r"HA Temperature state updated: ([\d.]+)")
humidity_update_pattern = re.compile(r"HA Humidity state updated: ([\d.]+)")
motion_update_pattern = re.compile(r"HA Motion state changed: (ON|OFF)")
weather_update_pattern = re.compile(r"HA Weather condition updated: (\w+)")
# Number pattern
ha_number_pattern = re.compile(r"Setting HA number to: ([\d.]+)")
tests_complete_pattern = re.compile(r"=== All tests completed ===")
# Track all log lines for debugging
log_lines: list[str] = []
# Track HomeAssistant service calls
ha_service_calls: list[HomeassistantServiceCall] = []
# Service call futures organized by service name
service_call_futures = {
"light.turn_off": loop.create_future(), # basic_service_call
"light.turn_on": loop.create_future(), # templated_service_call
"notify.test": loop.create_future(), # empty_string_service_call
"climate.set_temperature": loop.create_future(), # multiple_fields_service_call
"script.test_script": loop.create_future(), # complex_lambda_service_call
"test.empty": loop.create_future(), # all_empty_service_call
"input_number.set_value": loop.create_future(), # ha_number_service_call
"switch.turn_on": loop.create_future(), # ha_switch_on_service_call
"switch.turn_off": loop.create_future(), # ha_switch_off_service_call
}
def on_service_call(service_call: HomeassistantServiceCall) -> None:
"""Capture HomeAssistant service calls."""
ha_service_calls.append(service_call)
# Check if this service call is one we're waiting for
if service_call.service in service_call_futures:
future = service_call_futures[service_call.service]
if not future.done():
future.set_result(service_call)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
log_lines.append(line)
# Check for patterns that capture values
if not lambda_computed_future.done():
match = lambda_computed_pattern.search(line)
if match:
lambda_computed_future.set_result(match.group(1))
elif not ha_temp_state_future.done() and ha_temp_state_pattern.search(line):
ha_temp_state_future.set_result(line)
elif not ha_humidity_state_future.done() and ha_humidity_state_pattern.search(
line
):
ha_humidity_state_future.set_result(line)
elif not ha_motion_state_future.done() and ha_motion_state_pattern.search(line):
ha_motion_state_future.set_result(line)
elif not ha_weather_state_future.done() and ha_weather_state_pattern.search(
line
):
ha_weather_state_future.set_result(line)
# Check state update patterns
elif not temp_update_future.done() and temp_update_pattern.search(line):
temp_update_future.set_result(line)
elif not humidity_update_future.done() and humidity_update_pattern.search(line):
humidity_update_future.set_result(line)
elif not motion_update_future.done() and motion_update_pattern.search(line):
motion_update_future.set_result(line)
elif not weather_update_future.done() and weather_update_pattern.search(line):
weather_update_future.set_result(line)
# Check number pattern
elif not ha_number_future.done() and ha_number_pattern.search(line):
match = ha_number_pattern.search(line)
if match:
ha_number_future.set_result(match.group(1))
elif not tests_complete_future.done() and tests_complete_pattern.search(line):
tests_complete_future.set_result(True)
# Run with log monitoring
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Verify device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "test-ha-api"
# Subscribe to HomeAssistant service calls
client.subscribe_service_calls(on_service_call)
# Send some Home Assistant states for our sensors to read
client.send_home_assistant_state("sensor.external_temperature", "", "22.5")
client.send_home_assistant_state("sensor.external_humidity", "", "65.0")
client.send_home_assistant_state("binary_sensor.external_motion", "", "ON")
client.send_home_assistant_state("weather.home", "condition", "sunny")
# List entities and services
_, services = await client.list_entities_services()
# Find the trigger service
trigger_service = next(
(s for s in services if s.name == "trigger_all_tests"), None
)
assert trigger_service is not None, "trigger_all_tests service not found"
# Execute all tests
client.execute_service(trigger_service, {})
# Wait for all tests to complete with appropriate timeouts
try:
# Templated service test - the main bug fix
computed_value = await asyncio.wait_for(lambda_computed_future, timeout=5.0)
# Verify the computed value is reasonable (75 * 255 / 100 = 191.25 -> 191)
assert computed_value in ["191", "192"], (
f"Unexpected computed value: {computed_value}"
)
# Check state reads - verify we received the mocked values
temp_line = await asyncio.wait_for(ha_temp_state_future, timeout=5.0)
assert "Current HA Temperature: 22.5" in temp_line
humidity_line = await asyncio.wait_for(
ha_humidity_state_future, timeout=5.0
)
assert "Current HA Humidity: 65.0" in humidity_line
motion_line = await asyncio.wait_for(ha_motion_state_future, timeout=5.0)
assert "Current HA Motion: ON" in motion_line
weather_line = await asyncio.wait_for(ha_weather_state_future, timeout=5.0)
assert "Current HA Weather: sunny" in weather_line
# Number test
number_value = await asyncio.wait_for(ha_number_future, timeout=5.0)
assert number_value == "42.5", f"Unexpected number value: {number_value}"
# Wait for completion
await asyncio.wait_for(tests_complete_future, timeout=5.0)
# Now verify the protobuf messages
# 1. Basic service call
basic_call = await asyncio.wait_for(
service_call_futures["light.turn_off"], timeout=2.0
)
assert basic_call.service == "light.turn_off"
assert "entity_id" in basic_call.data, (
f"entity_id not found in data: {basic_call.data}"
)
assert basic_call.data["entity_id"] == "light.test_light", (
f"Wrong entity_id: {basic_call.data['entity_id']}"
)
# 2. Templated service call - verify the temporary string issue is fixed
templated_call = await asyncio.wait_for(
service_call_futures["light.turn_on"], timeout=2.0
)
assert templated_call.service == "light.turn_on"
# Check the computed brightness value
assert "brightness" in templated_call.data
assert templated_call.data["brightness"] in ["191", "192"] # 75 * 255 / 100
# Check data_template
assert "color_name" in templated_call.data_template
assert templated_call.data_template["color_name"] == "test_value"
# Check variables
assert "transition" in templated_call.variables
assert templated_call.variables["transition"] == "2.5"
# 3. Empty string service call
empty_call = await asyncio.wait_for(
service_call_futures["notify.test"], timeout=2.0
)
assert empty_call.service == "notify.test"
# Verify empty strings are properly handled
assert "title" in empty_call.data and empty_call.data["title"] == ""
assert (
"target" in empty_call.data_template
and empty_call.data_template["target"] == ""
)
assert (
"sound" in empty_call.variables and empty_call.variables["sound"] == ""
)
# 4. Multiple fields service call
multi_call = await asyncio.wait_for(
service_call_futures["climate.set_temperature"], timeout=2.0
)
assert multi_call.service == "climate.set_temperature"
assert multi_call.data["temperature"] == "22"
assert multi_call.data["hvac_mode"] == "heat"
assert multi_call.data_template["target_temp_high"] == "24"
assert multi_call.variables["preset_mode"] == "comfort"
# 5. Complex lambda service call
complex_call = await asyncio.wait_for(
service_call_futures["script.test_script"], timeout=2.0
)
assert complex_call.service == "script.test_script"
assert complex_call.data["entity_id"] == "light.living_room"
assert complex_call.data["brightness_pct"] == "99" # 42 * 2.38 ≈ 99
# Check message includes sensor value
assert "message" in complex_call.data_template
assert "Sensor: 42.0" in complex_call.data_template["message"]
# 6. All empty service call
all_empty_call = await asyncio.wait_for(
service_call_futures["test.empty"], timeout=2.0
)
assert all_empty_call.service == "test.empty"
# All fields should be empty strings
assert all(v == "" for v in all_empty_call.data.values())
assert all(v == "" for v in all_empty_call.data_template.values())
assert all(v == "" for v in all_empty_call.variables.values())
# 7. HA Number service call
number_call = await asyncio.wait_for(
service_call_futures["input_number.set_value"], timeout=2.0
)
assert number_call.service == "input_number.set_value"
assert number_call.data["entity_id"] == "input_number.test_number"
# The value might be formatted with trailing zeros
assert float(number_call.data["value"]) == 42.5
# 8. HA Switch service calls
switch_on_call = await asyncio.wait_for(
service_call_futures["switch.turn_on"], timeout=2.0
)
assert switch_on_call.service == "switch.turn_on"
assert switch_on_call.data["entity_id"] == "switch.test_switch"
switch_off_call = await asyncio.wait_for(
service_call_futures["switch.turn_off"], timeout=2.0
)
assert switch_off_call.service == "switch.turn_off"
assert switch_off_call.data["entity_id"] == "switch.test_switch"
except TimeoutError as e:
# Show recent log lines for debugging
recent_logs = "\n".join(log_lines[-20:])
service_calls_summary = "\n".join(
f"- {call.service}" for call in ha_service_calls
)
pytest.fail(
f"Test timed out waiting for expected log pattern or service call. Error: {e}\n\n"
f"Recent log lines:\n{recent_logs}\n\n"
f"Received service calls:\n{service_calls_summary}"
)