mirror of
https://github.com/home-assistant/core.git
synced 2025-07-17 10:17:09 +00:00
Fix potential deadlock in platform setup that waits for MQTT
This commit is contained in:
parent
3c60bff7dc
commit
451ca92df9
@ -120,10 +120,15 @@ async def async_setup_platform(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the ARWN platform."""
|
"""Set up the ARWN platform."""
|
||||||
|
|
||||||
# Make sure MQTT integration is enabled and the client is available
|
async def _async_setup_mqtt() -> None:
|
||||||
if not await mqtt.async_wait_for_mqtt_client(hass):
|
# Make sure MQTT integration is enabled and the client is available
|
||||||
_LOGGER.error("MQTT integration is not available")
|
if not await mqtt.async_wait_for_mqtt_client(hass):
|
||||||
return
|
_LOGGER.error("MQTT integration is not available")
|
||||||
|
return
|
||||||
|
|
||||||
|
await mqtt.async_subscribe(hass, TOPIC, async_sensor_event_received, 0)
|
||||||
|
|
||||||
|
hass.create_task(_async_setup_mqtt(), "arwn setup")
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_sensor_event_received(msg: mqtt.ReceiveMessage) -> None:
|
def async_sensor_event_received(msg: mqtt.ReceiveMessage) -> None:
|
||||||
@ -167,8 +172,6 @@ async def async_setup_platform(
|
|||||||
)
|
)
|
||||||
store[sensor.name].set_event(event)
|
store[sensor.name].set_event(event)
|
||||||
|
|
||||||
await mqtt.async_subscribe(hass, TOPIC, async_sensor_event_received, 0)
|
|
||||||
|
|
||||||
|
|
||||||
class ArwnSensor(SensorEntity):
|
class ArwnSensor(SensorEntity):
|
||||||
"""Representation of an ARWN sensor."""
|
"""Representation of an ARWN sensor."""
|
||||||
|
@ -199,34 +199,38 @@ async def async_setup_platform(
|
|||||||
discovery_info: DiscoveryInfoType | None = None,
|
discovery_info: DiscoveryInfoType | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up the manual MQTT alarm platform."""
|
"""Set up the manual MQTT alarm platform."""
|
||||||
# Make sure MQTT integration is enabled and the client is available
|
|
||||||
# We cannot count on dependencies as the alarm_control_panel platform setup
|
async def _async_setup_entities() -> None:
|
||||||
# also will be triggered when mqtt is loading the `alarm_control_panel` platform
|
# Make sure MQTT integration is enabled and the client is available
|
||||||
if not await mqtt.async_wait_for_mqtt_client(hass):
|
# We cannot count on dependencies as the alarm_control_panel platform setup
|
||||||
_LOGGER.error("MQTT integration is not available")
|
# also will be triggered when mqtt is loading the `alarm_control_panel` platform
|
||||||
return
|
if not await mqtt.async_wait_for_mqtt_client(hass):
|
||||||
add_entities(
|
_LOGGER.error("MQTT integration is not available")
|
||||||
[
|
return
|
||||||
ManualMQTTAlarm(
|
add_entities(
|
||||||
hass,
|
[
|
||||||
config[CONF_NAME],
|
ManualMQTTAlarm(
|
||||||
config.get(CONF_CODE),
|
hass,
|
||||||
config.get(CONF_CODE_TEMPLATE),
|
config[CONF_NAME],
|
||||||
config.get(CONF_DISARM_AFTER_TRIGGER, DEFAULT_DISARM_AFTER_TRIGGER),
|
config.get(CONF_CODE),
|
||||||
config.get(mqtt.CONF_STATE_TOPIC),
|
config.get(CONF_CODE_TEMPLATE),
|
||||||
config.get(mqtt.CONF_COMMAND_TOPIC),
|
config.get(CONF_DISARM_AFTER_TRIGGER, DEFAULT_DISARM_AFTER_TRIGGER),
|
||||||
config.get(mqtt.CONF_QOS),
|
config.get(mqtt.CONF_STATE_TOPIC),
|
||||||
config.get(CONF_CODE_ARM_REQUIRED),
|
config.get(mqtt.CONF_COMMAND_TOPIC),
|
||||||
config.get(CONF_PAYLOAD_DISARM),
|
config.get(mqtt.CONF_QOS),
|
||||||
config.get(CONF_PAYLOAD_ARM_HOME),
|
config.get(CONF_CODE_ARM_REQUIRED),
|
||||||
config.get(CONF_PAYLOAD_ARM_AWAY),
|
config.get(CONF_PAYLOAD_DISARM),
|
||||||
config.get(CONF_PAYLOAD_ARM_NIGHT),
|
config.get(CONF_PAYLOAD_ARM_HOME),
|
||||||
config.get(CONF_PAYLOAD_ARM_VACATION),
|
config.get(CONF_PAYLOAD_ARM_AWAY),
|
||||||
config.get(CONF_PAYLOAD_ARM_CUSTOM_BYPASS),
|
config.get(CONF_PAYLOAD_ARM_NIGHT),
|
||||||
config,
|
config.get(CONF_PAYLOAD_ARM_VACATION),
|
||||||
)
|
config.get(CONF_PAYLOAD_ARM_CUSTOM_BYPASS),
|
||||||
]
|
config,
|
||||||
)
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
hass.create_task(_async_setup_entities(), "manual_mqtt setup")
|
||||||
|
|
||||||
|
|
||||||
class ManualMQTTAlarm(AlarmControlPanelEntity):
|
class ManualMQTTAlarm(AlarmControlPanelEntity):
|
||||||
|
@ -48,13 +48,27 @@ async def async_setup_scanner(
|
|||||||
discovery_info: DiscoveryInfoType | None = None,
|
discovery_info: DiscoveryInfoType | None = None,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Set up the MQTT JSON tracker."""
|
"""Set up the MQTT JSON tracker."""
|
||||||
# Make sure MQTT integration is enabled and the client is available
|
|
||||||
# We cannot count on dependencies as the device_tracker platform setup
|
|
||||||
# also will be triggered when mqtt is loading the `device_tracker` platform
|
|
||||||
if not await mqtt.async_wait_for_mqtt_client(hass):
|
|
||||||
_LOGGER.error("MQTT integration is not available")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
async def _async_wait_for_mqtt_and_set_up() -> None:
|
||||||
|
# Make sure MQTT integration is enabled and the client is available
|
||||||
|
# We cannot count on dependencies as the device_tracker platform setup
|
||||||
|
# also will be triggered when mqtt is loading the `device_tracker` platform
|
||||||
|
if not await mqtt.async_wait_for_mqtt_client(hass):
|
||||||
|
_LOGGER.error("MQTT integration is not available")
|
||||||
|
return
|
||||||
|
|
||||||
|
await _async_setup_scanner(hass, config, async_see)
|
||||||
|
|
||||||
|
hass.create_task(_async_wait_for_mqtt_and_set_up(), "mqtt_json setup")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_setup_scanner(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
config: ConfigType,
|
||||||
|
async_see: AsyncSeeCallback,
|
||||||
|
) -> None:
|
||||||
|
"""Set up MQTT JSON tracker."""
|
||||||
devices = config[CONF_DEVICES]
|
devices = config[CONF_DEVICES]
|
||||||
qos = config[CONF_QOS]
|
qos = config[CONF_QOS]
|
||||||
|
|
||||||
@ -83,8 +97,6 @@ async def async_setup_scanner(
|
|||||||
|
|
||||||
await mqtt.async_subscribe(hass, topic, async_message_received, qos)
|
await mqtt.async_subscribe(hass, topic, async_message_received, qos)
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_see_args(dev_id, data):
|
def _parse_see_args(dev_id, data):
|
||||||
"""Parse the payload location parameters, into the format see expects."""
|
"""Parse the payload location parameters, into the format see expects."""
|
||||||
|
@ -81,24 +81,28 @@ async def async_setup_platform(
|
|||||||
discovery_info: DiscoveryInfoType | None = None,
|
discovery_info: DiscoveryInfoType | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set up MQTT room Sensor."""
|
"""Set up MQTT room Sensor."""
|
||||||
# Make sure MQTT integration is enabled and the client is available
|
|
||||||
# We cannot count on dependencies as the sensor platform setup
|
async def _async_setup_entities() -> None:
|
||||||
# also will be triggered when mqtt is loading the `sensor` platform
|
# Make sure MQTT integration is enabled and the client is available
|
||||||
if not await mqtt.async_wait_for_mqtt_client(hass):
|
# We cannot count on dependencies as the sensor platform setup
|
||||||
_LOGGER.error("MQTT integration is not available")
|
# also will be triggered when mqtt is loading the `sensor` platform
|
||||||
return
|
if not await mqtt.async_wait_for_mqtt_client(hass):
|
||||||
async_add_entities(
|
_LOGGER.error("MQTT integration is not available")
|
||||||
[
|
return
|
||||||
MQTTRoomSensor(
|
async_add_entities(
|
||||||
config.get(CONF_NAME),
|
[
|
||||||
config[CONF_STATE_TOPIC],
|
MQTTRoomSensor(
|
||||||
config[CONF_DEVICE_ID],
|
config.get(CONF_NAME),
|
||||||
config[CONF_TIMEOUT],
|
config[CONF_STATE_TOPIC],
|
||||||
config[CONF_AWAY_TIMEOUT],
|
config[CONF_DEVICE_ID],
|
||||||
config.get(CONF_UNIQUE_ID),
|
config[CONF_TIMEOUT],
|
||||||
)
|
config[CONF_AWAY_TIMEOUT],
|
||||||
]
|
config.get(CONF_UNIQUE_ID),
|
||||||
)
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
hass.create_task(_async_setup_entities(), "mqtt_room setup")
|
||||||
|
|
||||||
|
|
||||||
class MQTTRoomSensor(SensorEntity):
|
class MQTTRoomSensor(SensorEntity):
|
||||||
|
@ -103,7 +103,7 @@ async def test_no_pending(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
entity_id = "alarm_control_panel.test"
|
entity_id = "alarm_control_panel.test"
|
||||||
|
|
||||||
@ -155,7 +155,7 @@ async def test_no_pending_when_code_not_req(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
entity_id = "alarm_control_panel.test"
|
entity_id = "alarm_control_panel.test"
|
||||||
|
|
||||||
@ -206,7 +206,7 @@ async def test_with_pending(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
entity_id = "alarm_control_panel.test"
|
entity_id = "alarm_control_panel.test"
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ async def test_setup_fails_without_mqtt_being_setup(
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
assert "MQTT integration is not available" in caplog.text
|
assert "MQTT integration is not available" in caplog.text
|
||||||
|
|
||||||
@ -95,7 +95,7 @@ async def test_ensure_device_tracker_platform_validation(hass: HomeAssistant) ->
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
assert mock_sp.call_count == 1
|
assert mock_sp.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
@ -110,7 +110,7 @@ async def test_json_message(hass: HomeAssistant) -> None:
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
async_fire_mqtt_message(hass, topic, location)
|
async_fire_mqtt_message(hass, topic, location)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
state = hass.states.get("device_tracker.zanzito")
|
state = hass.states.get("device_tracker.zanzito")
|
||||||
@ -131,7 +131,7 @@ async def test_non_json_message(
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
caplog.set_level(logging.ERROR)
|
caplog.set_level(logging.ERROR)
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
@ -153,7 +153,7 @@ async def test_incomplete_message(
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: topic}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
caplog.set_level(logging.ERROR)
|
caplog.set_level(logging.ERROR)
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
@ -177,7 +177,7 @@ async def test_single_level_wildcard_topic(hass: HomeAssistant) -> None:
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, topic, location)
|
async_fire_mqtt_message(hass, topic, location)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -198,7 +198,7 @@ async def test_multi_level_wildcard_topic(hass: HomeAssistant) -> None:
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, topic, location)
|
async_fire_mqtt_message(hass, topic, location)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -220,7 +220,7 @@ async def test_single_level_wildcard_topic_not_matching(hass: HomeAssistant) ->
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, topic, location)
|
async_fire_mqtt_message(hass, topic, location)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -240,7 +240,7 @@ async def test_multi_level_wildcard_topic_not_matching(hass: HomeAssistant) -> N
|
|||||||
DT_DOMAIN,
|
DT_DOMAIN,
|
||||||
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
{DT_DOMAIN: {CONF_PLATFORM: "mqtt_json", "devices": {dev_id: subscription}}},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, topic, location)
|
async_fire_mqtt_message(hass, topic, location)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
@ -78,7 +78,7 @@ async def test_no_mqtt(hass: HomeAssistant, caplog: pytest.LogCaptureFixture) ->
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
state = hass.states.get(SENSOR_STATE)
|
state = hass.states.get(SENSOR_STATE)
|
||||||
assert state is None
|
assert state is None
|
||||||
assert "MQTT integration is not available" in caplog.text
|
assert "MQTT integration is not available" in caplog.text
|
||||||
@ -100,7 +100,7 @@ async def test_room_update(hass: HomeAssistant, mqtt_mock: MqttMockHAClient) ->
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
await send_message(hass, BEDROOM_TOPIC, FAR_MESSAGE)
|
await send_message(hass, BEDROOM_TOPIC, FAR_MESSAGE)
|
||||||
await assert_state(hass, BEDROOM)
|
await assert_state(hass, BEDROOM)
|
||||||
@ -141,7 +141,7 @@ async def test_unique_id_is_set(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
state = hass.states.get(SENSOR_STATE)
|
state = hass.states.get(SENSOR_STATE)
|
||||||
assert state.state is not None
|
assert state.state is not None
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user