mirror of
https://github.com/esphome/esphome.git
synced 2025-08-10 12:27:46 +00:00
tests
This commit is contained in:
@@ -9,6 +9,7 @@ from aioesphomeapi import (
|
|||||||
EntityState,
|
EntityState,
|
||||||
SensorInfo,
|
SensorInfo,
|
||||||
TextSensorInfo,
|
TextSensorInfo,
|
||||||
|
UserService,
|
||||||
UserServiceArgType,
|
UserServiceArgType,
|
||||||
)
|
)
|
||||||
import pytest
|
import pytest
|
||||||
@@ -39,11 +40,11 @@ async def test_api_conditional_memory(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Find our entities
|
# Find our entities
|
||||||
client_connected = None
|
client_connected: BinarySensorInfo | None = None
|
||||||
client_disconnected_event = None
|
client_disconnected_event: BinarySensorInfo | None = None
|
||||||
service_called_sensor = None
|
service_called_sensor: BinarySensorInfo | None = None
|
||||||
service_arg_sensor = None
|
service_arg_sensor: SensorInfo | None = None
|
||||||
last_client_info = None
|
last_client_info: TextSensorInfo | None = None
|
||||||
|
|
||||||
for entity in entity_info:
|
for entity in entity_info:
|
||||||
if isinstance(entity, BinarySensorInfo):
|
if isinstance(entity, BinarySensorInfo):
|
||||||
@@ -73,8 +74,8 @@ async def test_api_conditional_memory(
|
|||||||
assert len(services) == 2, f"Expected 2 services, found {len(services)}"
|
assert len(services) == 2, f"Expected 2 services, found {len(services)}"
|
||||||
|
|
||||||
# Find our services
|
# Find our services
|
||||||
simple_service = None
|
simple_service: UserService | None = None
|
||||||
service_with_args = None
|
service_with_args: UserService | None = None
|
||||||
|
|
||||||
for service in services:
|
for service in services:
|
||||||
if service.name == "test_simple_service":
|
if service.name == "test_simple_service":
|
||||||
@@ -98,7 +99,7 @@ async def test_api_conditional_memory(
|
|||||||
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
|
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
|
||||||
|
|
||||||
# Track state changes
|
# Track state changes
|
||||||
states = {}
|
states: dict[int, EntityState] = {}
|
||||||
states_future: asyncio.Future[None] = loop.create_future()
|
states_future: asyncio.Future[None] = loop.create_future()
|
||||||
|
|
||||||
def on_state(state: EntityState) -> None:
|
def on_state(state: EntityState) -> None:
|
||||||
@@ -175,7 +176,7 @@ async def test_api_conditional_memory(
|
|||||||
# After disconnecting first client, reconnect and verify triggers work
|
# After disconnecting first client, reconnect and verify triggers work
|
||||||
async with api_client_connected() as client2:
|
async with api_client_connected() as client2:
|
||||||
# Subscribe to states with new client
|
# Subscribe to states with new client
|
||||||
states2 = {}
|
states2: dict[int, EntityState] = {}
|
||||||
connected_future: asyncio.Future[None] = loop.create_future()
|
connected_future: asyncio.Future[None] = loop.create_future()
|
||||||
|
|
||||||
def on_state2(state: EntityState) -> None:
|
def on_state2(state: EntityState) -> None:
|
||||||
|
Reference in New Issue
Block a user