mirror of
				https://github.com/home-assistant/core.git
				synced 2025-11-02 23:49:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1275 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1275 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Test the flow classes."""
 | 
						|
 | 
						|
import asyncio
 | 
						|
import dataclasses
 | 
						|
import logging
 | 
						|
from unittest.mock import Mock, patch
 | 
						|
 | 
						|
import pytest
 | 
						|
import voluptuous as vol
 | 
						|
 | 
						|
from homeassistant import config_entries, data_entry_flow
 | 
						|
from homeassistant.core import Event, HomeAssistant, callback
 | 
						|
from homeassistant.helpers import config_validation as cv
 | 
						|
from homeassistant.util.decorator import Registry
 | 
						|
 | 
						|
from .common import async_capture_events
 | 
						|
 | 
						|
 | 
						|
class MockFlowManager(data_entry_flow.FlowManager):
 | 
						|
    """Test flow manager."""
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        """Initialize the flow manager."""
 | 
						|
        super().__init__(None)
 | 
						|
        self._handlers = Registry()
 | 
						|
        self.mock_reg_handler = self._handlers.register
 | 
						|
        self.mock_created_entries = []
 | 
						|
 | 
						|
    async def async_create_flow(self, handler_key, *, context, data):
 | 
						|
        """Test create flow."""
 | 
						|
        handler = self._handlers.get(handler_key)
 | 
						|
 | 
						|
        if handler is None:
 | 
						|
            raise data_entry_flow.UnknownHandler
 | 
						|
 | 
						|
        flow = handler()
 | 
						|
        flow.init_step = context.get("init_step", "init")
 | 
						|
        return flow
 | 
						|
 | 
						|
    async def async_finish_flow(self, flow, result):
 | 
						|
        """Test finish flow."""
 | 
						|
        if result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
 | 
						|
            result["source"] = flow.context.get("source")
 | 
						|
            self.mock_created_entries.append(result)
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def manager() -> MockFlowManager:
 | 
						|
    """Return a flow manager."""
 | 
						|
    return MockFlowManager()
 | 
						|
 | 
						|
 | 
						|
async def test_configure_reuses_handler_instance(manager: MockFlowManager) -> None:
 | 
						|
    """Test that we reuse instances."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        handle_count = 0
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            self.handle_count += 1
 | 
						|
            return self.async_show_form(
 | 
						|
                errors={"base": str(self.handle_count)}, step_id="init"
 | 
						|
            )
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["errors"]["base"] == "1"
 | 
						|
    form = await manager.async_configure(form["flow_id"])
 | 
						|
    assert form["errors"]["base"] == "2"
 | 
						|
    assert manager.async_progress() == [
 | 
						|
        {
 | 
						|
            "flow_id": form["flow_id"],
 | 
						|
            "handler": "test",
 | 
						|
            "step_id": "init",
 | 
						|
            "context": {},
 | 
						|
        }
 | 
						|
    ]
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_configure_two_steps(manager: MockFlowManager) -> None:
 | 
						|
    """Test that we reuse instances."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
        async def async_step_first(self, user_input=None):
 | 
						|
            if user_input is not None:
 | 
						|
                return await self.async_step_second()
 | 
						|
            return self.async_show_form(step_id="first", data_schema=vol.Schema([str]))
 | 
						|
 | 
						|
        async def async_step_second(self, user_input=None):
 | 
						|
            if user_input is not None:
 | 
						|
                return self.async_create_entry(
 | 
						|
                    title="Test Entry", data=self.init_data + user_input
 | 
						|
                )
 | 
						|
            return self.async_show_form(step_id="second", data_schema=vol.Schema([str]))
 | 
						|
 | 
						|
    form = await manager.async_init(
 | 
						|
        "test", context={"init_step": "first"}, data=["INIT-DATA"]
 | 
						|
    )
 | 
						|
 | 
						|
    with pytest.raises(vol.Invalid):
 | 
						|
        form = await manager.async_configure(form["flow_id"], "INCORRECT-DATA")
 | 
						|
 | 
						|
    form = await manager.async_configure(form["flow_id"], ["SECOND-DATA"])
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 1
 | 
						|
    result = manager.mock_created_entries[0]
 | 
						|
    assert result["handler"] == "test"
 | 
						|
    assert result["data"] == ["INIT-DATA", "SECOND-DATA"]
 | 
						|
 | 
						|
 | 
						|
async def test_show_form(manager: MockFlowManager) -> None:
 | 
						|
    """Test that we can show a form."""
 | 
						|
    schema = vol.Schema({vol.Required("username"): str, vol.Required("password"): str})
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_show_form(
 | 
						|
                step_id="init",
 | 
						|
                data_schema=schema,
 | 
						|
                errors={"username": "Should be unique."},
 | 
						|
            )
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert form["data_schema"] is schema
 | 
						|
    assert form["errors"] == {"username": "Should be unique."}
 | 
						|
 | 
						|
 | 
						|
async def test_form_shows_with_added_suggested_values(manager: MockFlowManager) -> None:
 | 
						|
    """Test that we can show a form with suggested values."""
 | 
						|
 | 
						|
    def compare_schemas(schema: vol.Schema, expected_schema: vol.Schema) -> None:
 | 
						|
        """Compare two schemas."""
 | 
						|
        assert schema.schema is not expected_schema.schema
 | 
						|
 | 
						|
        assert list(schema.schema) == list(expected_schema.schema)
 | 
						|
 | 
						|
        for key, validator in schema.schema.items():
 | 
						|
            if isinstance(validator, data_entry_flow.section):
 | 
						|
                assert validator.schema == expected_schema.schema[key].schema
 | 
						|
                continue
 | 
						|
            assert validator == expected_schema.schema[key]
 | 
						|
 | 
						|
    schema = vol.Schema(
 | 
						|
        {
 | 
						|
            vol.Required("username"): str,
 | 
						|
            vol.Required("password"): str,
 | 
						|
            vol.Required("section_1"): data_entry_flow.section(
 | 
						|
                vol.Schema(
 | 
						|
                    {
 | 
						|
                        vol.Optional("full_name"): str,
 | 
						|
                    }
 | 
						|
                ),
 | 
						|
                {"collapsed": False},
 | 
						|
            ),
 | 
						|
        }
 | 
						|
    )
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            data_schema = self.add_suggested_values_to_schema(
 | 
						|
                schema,
 | 
						|
                user_input,
 | 
						|
            )
 | 
						|
            return self.async_show_form(
 | 
						|
                step_id="init",
 | 
						|
                data_schema=data_schema,
 | 
						|
            )
 | 
						|
 | 
						|
    form = await manager.async_init(
 | 
						|
        "test",
 | 
						|
        data={
 | 
						|
            "username": "doej",
 | 
						|
            "password": "verySecret1",
 | 
						|
            "section_1": {"full_name": "John Doe"},
 | 
						|
        },
 | 
						|
    )
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert form["data_schema"].schema is not schema.schema
 | 
						|
    assert form["data_schema"].schema != schema.schema
 | 
						|
    compare_schemas(form["data_schema"], schema)
 | 
						|
    markers = list(form["data_schema"].schema)
 | 
						|
    assert len(markers) == 3
 | 
						|
    assert markers[0] == "username"
 | 
						|
    assert markers[0].description == {"suggested_value": "doej"}
 | 
						|
    assert markers[1] == "password"
 | 
						|
    assert markers[1].description == {"suggested_value": "verySecret1"}
 | 
						|
    assert markers[2] == "section_1"
 | 
						|
    section_validator = form["data_schema"].schema["section_1"]
 | 
						|
    assert isinstance(section_validator, data_entry_flow.section)
 | 
						|
    # The section instance was copied
 | 
						|
    assert section_validator is not schema.schema["section_1"]
 | 
						|
    # The section schema instance was copied
 | 
						|
    assert section_validator.schema is not schema.schema["section_1"].schema
 | 
						|
    assert section_validator.schema == schema.schema["section_1"].schema
 | 
						|
    section_markers = list(section_validator.schema.schema)
 | 
						|
    assert len(section_markers) == 1
 | 
						|
    assert section_markers[0] == "full_name"
 | 
						|
    assert section_markers[0].description == {"suggested_value": "John Doe"}
 | 
						|
 | 
						|
    # Test again without suggested values to make sure we're not mutating the schema
 | 
						|
    form = await manager.async_init(
 | 
						|
        "test",
 | 
						|
    )
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert form["data_schema"].schema is not schema.schema
 | 
						|
    assert form["data_schema"].schema == schema.schema
 | 
						|
    markers = list(form["data_schema"].schema)
 | 
						|
    assert len(markers) == 3
 | 
						|
    assert markers[0] == "username"
 | 
						|
    assert markers[0].description is None
 | 
						|
    assert markers[1] == "password"
 | 
						|
    assert markers[1].description is None
 | 
						|
    assert markers[2] == "section_1"
 | 
						|
    section_validator = form["data_schema"].schema["section_1"]
 | 
						|
    assert isinstance(section_validator, data_entry_flow.section)
 | 
						|
    # The section class is not replaced if there is no suggested value for the section
 | 
						|
    assert section_validator is schema.schema["section_1"]
 | 
						|
    # The section schema is not replaced if there is no suggested value for the section
 | 
						|
    assert section_validator.schema is schema.schema["section_1"].schema
 | 
						|
    section_markers = list(section_validator.schema.schema)
 | 
						|
    assert len(section_markers) == 1
 | 
						|
    assert section_markers[0] == "full_name"
 | 
						|
    assert section_markers[0].description is None
 | 
						|
 | 
						|
 | 
						|
async def test_abort_removes_instance(manager: MockFlowManager) -> None:
 | 
						|
    """Test that abort removes the flow from progress."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        is_new = True
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            old = self.is_new
 | 
						|
            self.is_new = False
 | 
						|
            return self.async_abort(reason=str(old))
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["reason"] == "True"
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["reason"] == "True"
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_abort_aborted_flow(manager: MockFlowManager) -> None:
 | 
						|
    """Test return abort from aborted flow."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            manager.async_abort(self.flow_id)
 | 
						|
            return self.async_abort(reason="blah")
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["reason"] == "blah"
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_abort_calls_async_remove(manager: MockFlowManager) -> None:
 | 
						|
    """Test abort calling the async_remove FlowHandler method."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_abort(reason="reason")
 | 
						|
 | 
						|
        async_remove = Mock()
 | 
						|
 | 
						|
    await manager.async_init("test")
 | 
						|
 | 
						|
    TestFlow.async_remove.assert_called_once()
 | 
						|
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_abort_calls_async_flow_removed(manager: MockFlowManager) -> None:
 | 
						|
    """Test abort calling the async_flow_removed FlowManager method."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_abort(reason="reason")
 | 
						|
 | 
						|
    manager.async_flow_removed = Mock()
 | 
						|
    await manager.async_init("test")
 | 
						|
 | 
						|
    manager.async_flow_removed.assert_called_once()
 | 
						|
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_abort_calls_async_remove_with_exception(
 | 
						|
    manager: MockFlowManager, caplog: pytest.LogCaptureFixture
 | 
						|
) -> None:
 | 
						|
    """Test abort calling the async_remove FlowHandler method, with an exception."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_abort(reason="reason")
 | 
						|
 | 
						|
        async_remove = Mock(side_effect=[RuntimeError("error")])
 | 
						|
 | 
						|
    with caplog.at_level(logging.ERROR):
 | 
						|
        await manager.async_init("test")
 | 
						|
 | 
						|
    assert "Error removing test flow" in caplog.text
 | 
						|
 | 
						|
    TestFlow.async_remove.assert_called_once()
 | 
						|
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_create_saves_data(manager: MockFlowManager) -> None:
 | 
						|
    """Test creating a config entry."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_create_entry(title="Test Title", data="Test Data")
 | 
						|
 | 
						|
    await manager.async_init("test")
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 1
 | 
						|
 | 
						|
    entry = manager.mock_created_entries[0]
 | 
						|
    assert entry["handler"] == "test"
 | 
						|
    assert entry["title"] == "Test Title"
 | 
						|
    assert entry["data"] == "Test Data"
 | 
						|
    assert entry["source"] is None
 | 
						|
 | 
						|
 | 
						|
async def test_create_aborted_flow(manager: MockFlowManager) -> None:
 | 
						|
    """Test return create_entry from aborted flow."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            manager.async_abort(self.flow_id)
 | 
						|
            return self.async_create_entry(title="Test Title", data="Test Data")
 | 
						|
 | 
						|
    with pytest.raises(data_entry_flow.UnknownFlow):
 | 
						|
        await manager.async_init("test")
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
 | 
						|
    # No entry should be created if the flow is aborted
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_create_calls_async_flow_removed(manager: MockFlowManager) -> None:
 | 
						|
    """Test create calling the async_flow_removed FlowManager method."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_create_entry(title="Test Title", data="Test Data")
 | 
						|
 | 
						|
    manager.async_flow_removed = Mock()
 | 
						|
    await manager.async_init("test")
 | 
						|
 | 
						|
    manager.async_flow_removed.assert_called_once()
 | 
						|
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 1
 | 
						|
 | 
						|
 | 
						|
async def test_discovery_init_flow(manager: MockFlowManager) -> None:
 | 
						|
    """Test a flow initialized by discovery."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
 | 
						|
        async def async_step_init(self, info):
 | 
						|
            return self.async_create_entry(title=info["id"], data=info)
 | 
						|
 | 
						|
    data = {"id": "hello", "token": "secret"}
 | 
						|
 | 
						|
    await manager.async_init(
 | 
						|
        "test", context={"source": config_entries.SOURCE_DISCOVERY}, data=data
 | 
						|
    )
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 1
 | 
						|
 | 
						|
    entry = manager.mock_created_entries[0]
 | 
						|
    assert entry["handler"] == "test"
 | 
						|
    assert entry["title"] == "hello"
 | 
						|
    assert entry["data"] == data
 | 
						|
    assert entry["source"] == config_entries.SOURCE_DISCOVERY
 | 
						|
 | 
						|
 | 
						|
async def test_finish_callback_change_result_type(hass: HomeAssistant) -> None:
 | 
						|
    """Test finish callback can change result type."""
 | 
						|
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
        async def async_step_init(self, input):
 | 
						|
            """Return init form with one input field 'count'."""
 | 
						|
            if input is not None:
 | 
						|
                return self.async_create_entry(title="init", data=input)
 | 
						|
            return self.async_show_form(
 | 
						|
                step_id="init", data_schema=vol.Schema({"count": int})
 | 
						|
            )
 | 
						|
 | 
						|
    class FlowManager(data_entry_flow.FlowManager):
 | 
						|
        async def async_create_flow(self, handler_key, *, context, data):
 | 
						|
            """Create a test flow."""
 | 
						|
            return TestFlow()
 | 
						|
 | 
						|
        async def async_finish_flow(self, flow, result):
 | 
						|
            """Redirect to init form if count <= 1."""
 | 
						|
            if result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
 | 
						|
                if result["data"] is None or result["data"].get("count", 0) <= 1:
 | 
						|
                    return flow.async_show_form(
 | 
						|
                        step_id="init", data_schema=vol.Schema({"count": int})
 | 
						|
                    )
 | 
						|
                result["result"] = result["data"]["count"]
 | 
						|
            return result
 | 
						|
 | 
						|
    manager = FlowManager(hass)
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert result["step_id"] == "init"
 | 
						|
 | 
						|
    result = await manager.async_configure(result["flow_id"], {"count": 0})
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert result["step_id"] == "init"
 | 
						|
    assert "result" not in result
 | 
						|
 | 
						|
    result = await manager.async_configure(result["flow_id"], {"count": 2})
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert result["result"] == 2
 | 
						|
 | 
						|
 | 
						|
async def test_external_step(hass: HomeAssistant, manager: MockFlowManager) -> None:
 | 
						|
    """Test external step logic."""
 | 
						|
    manager.hass = hass
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            if not user_input:
 | 
						|
                return self.async_external_step(
 | 
						|
                    step_id="init", url="https://example.com"
 | 
						|
                )
 | 
						|
 | 
						|
            self.data = user_input
 | 
						|
            return self.async_external_step_done(next_step_id="finish")
 | 
						|
 | 
						|
        async def async_step_finish(self, user_input=None):
 | 
						|
            return self.async_create_entry(title=self.data["title"], data=self.data)
 | 
						|
 | 
						|
    events = async_capture_events(
 | 
						|
        hass, data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESSED
 | 
						|
    )
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.EXTERNAL_STEP
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Mimic external step
 | 
						|
    # Called by integrations: `hass.config_entries.flow.async_configure(…)`
 | 
						|
    result = await manager.async_configure(result["flow_id"], {"title": "Hello"})
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.EXTERNAL_STEP_DONE
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert result["title"] == "Hello"
 | 
						|
 | 
						|
 | 
						|
async def test_show_progress(hass: HomeAssistant, manager: MockFlowManager) -> None:
 | 
						|
    """Test show progress logic."""
 | 
						|
    manager.hass = hass
 | 
						|
    events = []
 | 
						|
    progress_update_events = async_capture_events(
 | 
						|
        hass, data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESS_UPDATE
 | 
						|
    )
 | 
						|
    task_one_evt = asyncio.Event()
 | 
						|
    task_two_evt = asyncio.Event()
 | 
						|
    event_received_evt = asyncio.Event()
 | 
						|
 | 
						|
    @callback
 | 
						|
    def capture_events(event: Event) -> None:
 | 
						|
        events.append(event)
 | 
						|
        event_received_evt.set()
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
        start_task_two = False
 | 
						|
        task_one: asyncio.Task[None] | None = None
 | 
						|
        task_two: asyncio.Task[None] | None = None
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            async def long_running_job_one() -> None:
 | 
						|
                await task_one_evt.wait()
 | 
						|
 | 
						|
            async def long_running_job_two() -> None:
 | 
						|
                self.async_update_progress(0.25)
 | 
						|
                await task_two_evt.wait()
 | 
						|
                self.async_update_progress(0.75)
 | 
						|
                self.data = {"title": "Hello"}
 | 
						|
 | 
						|
            uncompleted_task: asyncio.Task[None] | None = None
 | 
						|
            if not self.task_one:
 | 
						|
                self.task_one = hass.async_create_task(long_running_job_one())
 | 
						|
 | 
						|
            progress_action = None
 | 
						|
            if not self.task_one.done():
 | 
						|
                progress_action = "task_one"
 | 
						|
                uncompleted_task = self.task_one
 | 
						|
 | 
						|
            if not uncompleted_task:
 | 
						|
                if not self.task_two:
 | 
						|
                    self.task_two = hass.async_create_task(long_running_job_two())
 | 
						|
 | 
						|
                if not self.task_two.done():
 | 
						|
                    progress_action = "task_two"
 | 
						|
                    uncompleted_task = self.task_two
 | 
						|
 | 
						|
            if uncompleted_task:
 | 
						|
                assert progress_action
 | 
						|
                return self.async_show_progress(
 | 
						|
                    progress_action=progress_action,
 | 
						|
                    progress_task=uncompleted_task,
 | 
						|
                )
 | 
						|
 | 
						|
            return self.async_show_progress_done(next_step_id="finish")
 | 
						|
 | 
						|
        async def async_step_finish(self, user_input=None):
 | 
						|
            return self.async_create_entry(title=self.data["title"], data=self.data)
 | 
						|
 | 
						|
    hass.bus.async_listen(
 | 
						|
        data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESSED,
 | 
						|
        capture_events,
 | 
						|
    )
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_one"
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Set task one done and wait for event
 | 
						|
    task_one_evt.set()
 | 
						|
    await event_received_evt.wait()
 | 
						|
    event_received_evt.clear()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_two"
 | 
						|
    assert len(progress_update_events) == 1
 | 
						|
    assert progress_update_events[0].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "progress": 0.25,
 | 
						|
    }
 | 
						|
 | 
						|
    # Set task two done and wait for event
 | 
						|
    task_two_evt.set()
 | 
						|
    await event_received_evt.wait()
 | 
						|
    event_received_evt.clear()
 | 
						|
    assert len(events) == 2  # 1 for task one and 1 for task two
 | 
						|
    assert events[1].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
    assert len(progress_update_events) == 2
 | 
						|
    assert progress_update_events[1].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "progress": 0.75,
 | 
						|
    }
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert result["title"] == "Hello"
 | 
						|
 | 
						|
 | 
						|
async def test_show_progress_error(
 | 
						|
    hass: HomeAssistant, manager: MockFlowManager
 | 
						|
) -> None:
 | 
						|
    """Test show progress logic."""
 | 
						|
    manager.hass = hass
 | 
						|
    events = []
 | 
						|
    event_received_evt = asyncio.Event()
 | 
						|
 | 
						|
    @callback
 | 
						|
    def capture_events(event: Event) -> None:
 | 
						|
        events.append(event)
 | 
						|
        event_received_evt.set()
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
        progress_task: asyncio.Task[None] | None = None
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            async def long_running_task() -> None:
 | 
						|
                await asyncio.sleep(0)
 | 
						|
                raise TypeError
 | 
						|
 | 
						|
            if not self.progress_task:
 | 
						|
                self.progress_task = hass.async_create_task(long_running_task())
 | 
						|
            if self.progress_task and self.progress_task.done():
 | 
						|
                if self.progress_task.exception():
 | 
						|
                    return self.async_show_progress_done(next_step_id="error")
 | 
						|
                return self.async_show_progress_done(next_step_id="no_error")
 | 
						|
            return self.async_show_progress(
 | 
						|
                progress_action="task", progress_task=self.progress_task
 | 
						|
            )
 | 
						|
 | 
						|
        async def async_step_error(self, user_input=None):
 | 
						|
            return self.async_abort(reason="error")
 | 
						|
 | 
						|
    hass.bus.async_listen(
 | 
						|
        data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESSED,
 | 
						|
        capture_events,
 | 
						|
    )
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task"
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Set task one done and wait for event
 | 
						|
    await event_received_evt.wait()
 | 
						|
    event_received_evt.clear()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.ABORT
 | 
						|
    assert result["reason"] == "error"
 | 
						|
 | 
						|
 | 
						|
async def test_show_progress_hidden_from_frontend(
 | 
						|
    hass: HomeAssistant, manager: MockFlowManager
 | 
						|
) -> None:
 | 
						|
    """Test show progress done is not sent to frontend."""
 | 
						|
    manager.hass = hass
 | 
						|
    async_show_progress_done_called = False
 | 
						|
    progress_task: asyncio.Task[None] | None = None
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            nonlocal progress_task
 | 
						|
 | 
						|
            async def long_running_job() -> None:
 | 
						|
                await asyncio.sleep(0)
 | 
						|
 | 
						|
            if not progress_task:
 | 
						|
                progress_task = hass.async_create_task(long_running_job())
 | 
						|
            if progress_task.done():
 | 
						|
                nonlocal async_show_progress_done_called
 | 
						|
                async_show_progress_done_called = True
 | 
						|
                return self.async_show_progress_done(next_step_id="finish")
 | 
						|
            return self.async_show_progress(
 | 
						|
                step_id="init",
 | 
						|
                progress_action="task",
 | 
						|
                # Set to a task which never finishes to simulate flow manager has not
 | 
						|
                # yet called when frontend loads
 | 
						|
                progress_task=hass.async_create_task(asyncio.Event().wait()),
 | 
						|
            )
 | 
						|
 | 
						|
        async def async_step_finish(self, user_input=None):
 | 
						|
            return self.async_create_entry(title=None, data=self.data)
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task"
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    await progress_task
 | 
						|
    assert not async_show_progress_done_called
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert async_show_progress_done_called
 | 
						|
 | 
						|
 | 
						|
async def test_show_progress_legacy(
 | 
						|
    hass: HomeAssistant, manager: MockFlowManager, caplog: pytest.LogCaptureFixture
 | 
						|
) -> None:
 | 
						|
    """Test show progress logic.
 | 
						|
 | 
						|
    This tests the deprecated version where the config flow is responsible for
 | 
						|
    resuming the flow.
 | 
						|
    """
 | 
						|
    manager.hass = hass
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
        task_one_done = False
 | 
						|
        task_two_done = False
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            if user_input and "task_finished" in user_input:
 | 
						|
                if user_input["task_finished"] == 1:
 | 
						|
                    self.task_one_done = True
 | 
						|
                elif user_input["task_finished"] == 2:
 | 
						|
                    self.task_two_done = True
 | 
						|
 | 
						|
            if not self.task_one_done:
 | 
						|
                progress_action = "task_one"
 | 
						|
            elif not self.task_two_done:
 | 
						|
                progress_action = "task_two"
 | 
						|
            if not self.task_one_done or not self.task_two_done:
 | 
						|
                return self.async_show_progress(
 | 
						|
                    step_id="init",
 | 
						|
                    progress_action=progress_action,
 | 
						|
                )
 | 
						|
 | 
						|
            self.data = user_input
 | 
						|
            return self.async_show_progress_done(next_step_id="finish")
 | 
						|
 | 
						|
        async def async_step_finish(self, user_input=None):
 | 
						|
            return self.async_create_entry(title=self.data["title"], data=self.data)
 | 
						|
 | 
						|
    events = async_capture_events(
 | 
						|
        hass, data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESSED
 | 
						|
    )
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_one"
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Mimic task one done and moving to task two
 | 
						|
    # Called by integrations: `hass.config_entries.flow.async_configure(…)`
 | 
						|
    result = await manager.async_configure(result["flow_id"], {"task_finished": 1})
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_two"
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(result["flow_id"])
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_two"
 | 
						|
 | 
						|
    # Mimic task two done and continuing step
 | 
						|
    # Called by integrations: `hass.config_entries.flow.async_configure(…)`
 | 
						|
    result = await manager.async_configure(
 | 
						|
        result["flow_id"], {"task_finished": 2, "title": "Hello"}
 | 
						|
    )
 | 
						|
    # Note: The SHOW_PROGRESS_DONE is not hidden from frontend when flows manage
 | 
						|
    # the progress tasks themselves
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS_DONE
 | 
						|
 | 
						|
    # Frontend refreshes the flow
 | 
						|
    result = await manager.async_configure(
 | 
						|
        result["flow_id"], {"task_finished": 2, "title": "Hello"}
 | 
						|
    )
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert result["title"] == "Hello"
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 2  # 1 for task one and 1 for task two
 | 
						|
    assert events[1].data == {
 | 
						|
        "handler": "test",
 | 
						|
        "flow_id": result["flow_id"],
 | 
						|
        "refresh": True,
 | 
						|
    }
 | 
						|
 | 
						|
    # Check for deprecation warning
 | 
						|
    assert (
 | 
						|
        "tests.test_data_entry_flow::TestFlow calls async_show_progress without passing"
 | 
						|
        " a progress task, this is not valid and will break in Home Assistant "
 | 
						|
        "Core 2024.8."
 | 
						|
    ) in caplog.text
 | 
						|
 | 
						|
 | 
						|
async def test_show_progress_fires_only_when_changed(
 | 
						|
    hass: HomeAssistant, manager: MockFlowManager
 | 
						|
) -> None:
 | 
						|
    """Test show progress change logic."""
 | 
						|
    manager.hass = hass
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            if user_input:
 | 
						|
                progress_action = user_input["progress_action"]
 | 
						|
                description_placeholders = user_input["description_placeholders"]
 | 
						|
                return self.async_show_progress(
 | 
						|
                    step_id="init",
 | 
						|
                    progress_action=progress_action,
 | 
						|
                    description_placeholders=description_placeholders,
 | 
						|
                )
 | 
						|
            return self.async_show_progress(step_id="init", progress_action="task_one")
 | 
						|
 | 
						|
        async def async_step_finish(self, user_input=None):
 | 
						|
            return self.async_create_entry(title=self.data["title"], data=self.data)
 | 
						|
 | 
						|
    events = async_capture_events(
 | 
						|
        hass, data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESSED
 | 
						|
    )
 | 
						|
 | 
						|
    async def test_change(
 | 
						|
        flow_id,
 | 
						|
        events,
 | 
						|
        progress_action,
 | 
						|
        description_placeholders_progress,
 | 
						|
        number_of_events,
 | 
						|
        is_change,
 | 
						|
    ) -> None:
 | 
						|
        # Called by integrations: `hass.config_entries.flow.async_configure(…)`
 | 
						|
        result = await manager.async_configure(
 | 
						|
            flow_id,
 | 
						|
            {
 | 
						|
                "progress_action": progress_action,
 | 
						|
                "description_placeholders": {
 | 
						|
                    "progress": description_placeholders_progress
 | 
						|
                },
 | 
						|
            },
 | 
						|
        )
 | 
						|
        assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
        assert result["progress_action"] == progress_action
 | 
						|
        assert (
 | 
						|
            result["description_placeholders"]["progress"]
 | 
						|
            == description_placeholders_progress
 | 
						|
        )
 | 
						|
 | 
						|
        await hass.async_block_till_done()
 | 
						|
        assert len(events) == number_of_events
 | 
						|
        if is_change:
 | 
						|
            assert events[number_of_events - 1].data == {
 | 
						|
                "handler": "test",
 | 
						|
                "flow_id": result["flow_id"],
 | 
						|
                "refresh": True,
 | 
						|
            }
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.SHOW_PROGRESS
 | 
						|
    assert result["progress_action"] == "task_one"
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Mimic task one tests
 | 
						|
    await test_change(
 | 
						|
        result["flow_id"], events, "task_one", 0, 1, True
 | 
						|
    )  # change (progress action)
 | 
						|
    await test_change(result["flow_id"], events, "task_one", 0, 1, False)  # no change
 | 
						|
    await test_change(
 | 
						|
        result["flow_id"], events, "task_one", 25, 2, True
 | 
						|
    )  # change (description placeholder)
 | 
						|
    await test_change(
 | 
						|
        result["flow_id"], events, "task_two", 50, 3, True
 | 
						|
    )  # change (progress action and description placeholder)
 | 
						|
    await test_change(result["flow_id"], events, "task_two", 50, 3, False)  # no change
 | 
						|
    await test_change(
 | 
						|
        result["flow_id"], events, "task_two", 100, 4, True
 | 
						|
    )  # change (description placeholder)
 | 
						|
 | 
						|
 | 
						|
async def test_abort_flow_exception_step(manager: MockFlowManager) -> None:
 | 
						|
    """Test that the AbortFlow exception works in a step."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            raise data_entry_flow.AbortFlow("mock-reason", {"placeholder": "yo"})
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.ABORT
 | 
						|
    assert form["reason"] == "mock-reason"
 | 
						|
    assert form["description_placeholders"] == {"placeholder": "yo"}
 | 
						|
 | 
						|
 | 
						|
async def test_abort_flow_exception_finish_flow(hass: HomeAssistant) -> None:
 | 
						|
    """Test that the AbortFlow exception works when finishing a flow."""
 | 
						|
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
        async def async_step_init(self, input):
 | 
						|
            """Return init form with one input field 'count'."""
 | 
						|
            return self.async_create_entry(title="init", data=input)
 | 
						|
 | 
						|
    class FlowManager(data_entry_flow.FlowManager):
 | 
						|
        async def async_create_flow(self, handler_key, *, context, data):
 | 
						|
            """Create a test flow."""
 | 
						|
            return TestFlow()
 | 
						|
 | 
						|
        async def async_finish_flow(self, flow, result):
 | 
						|
            """Raise AbortFlow."""
 | 
						|
            raise data_entry_flow.AbortFlow("mock-reason", {"placeholder": "yo"})
 | 
						|
 | 
						|
    manager = FlowManager(hass)
 | 
						|
 | 
						|
    form = await manager.async_init("test")
 | 
						|
    assert form["type"] == data_entry_flow.FlowResultType.ABORT
 | 
						|
    assert form["reason"] == "mock-reason"
 | 
						|
    assert form["description_placeholders"] == {"placeholder": "yo"}
 | 
						|
 | 
						|
 | 
						|
async def test_init_unknown_flow(manager: MockFlowManager) -> None:
 | 
						|
    """Test that UnknownFlow is raised when async_create_flow returns None."""
 | 
						|
 | 
						|
    with (
 | 
						|
        pytest.raises(data_entry_flow.UnknownFlow),
 | 
						|
        patch.object(manager, "async_create_flow", return_value=None),
 | 
						|
    ):
 | 
						|
        await manager.async_init("test")
 | 
						|
 | 
						|
 | 
						|
async def test_async_get_unknown_flow(manager: MockFlowManager) -> None:
 | 
						|
    """Test that UnknownFlow is raised when async_get is called with a flow_id that does not exist."""
 | 
						|
 | 
						|
    with pytest.raises(data_entry_flow.UnknownFlow):
 | 
						|
        await manager.async_get("does_not_exist")
 | 
						|
 | 
						|
 | 
						|
async def test_move_to_unknown_step_raises_and_removes_from_in_progress(
 | 
						|
    manager: MockFlowManager,
 | 
						|
) -> None:
 | 
						|
    """Test that moving to an unknown step raises and removes the flow from in progress."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
    with pytest.raises(data_entry_flow.UnknownStep):
 | 
						|
        await manager.async_init("test", context={"init_step": "does_not_exist"})
 | 
						|
 | 
						|
    assert manager.async_progress() == []
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize(
 | 
						|
    ("result_type", "params"),
 | 
						|
    [
 | 
						|
        ("async_external_step_done", {"next_step_id": "does_not_exist"}),
 | 
						|
        ("async_external_step", {"step_id": "does_not_exist", "url": "blah"}),
 | 
						|
        ("async_show_form", {"step_id": "does_not_exist"}),
 | 
						|
        ("async_show_menu", {"step_id": "does_not_exist", "menu_options": []}),
 | 
						|
        ("async_show_progress_done", {"next_step_id": "does_not_exist"}),
 | 
						|
        ("async_show_progress", {"step_id": "does_not_exist", "progress_action": ""}),
 | 
						|
    ],
 | 
						|
)
 | 
						|
async def test_next_step_unknown_step_raises_and_removes_from_in_progress(
 | 
						|
    manager: MockFlowManager, result_type: str, params: dict[str, str]
 | 
						|
) -> None:
 | 
						|
    """Test that moving to an unknown step raises and removes the flow from in progress."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return getattr(self, result_type)(**params)
 | 
						|
 | 
						|
    with pytest.raises(data_entry_flow.UnknownStep):
 | 
						|
        await manager.async_init("test", context={"init_step": "init"})
 | 
						|
 | 
						|
    assert manager.async_progress() == []
 | 
						|
 | 
						|
 | 
						|
async def test_configure_raises_unknown_flow_if_not_in_progress(
 | 
						|
    manager: MockFlowManager,
 | 
						|
) -> None:
 | 
						|
    """Test configure raises UnknownFlow if the flow is not in progress."""
 | 
						|
    with pytest.raises(data_entry_flow.UnknownFlow):
 | 
						|
        await manager.async_configure("wrong_flow_id")
 | 
						|
 | 
						|
 | 
						|
async def test_manager_abort_raises_unknown_flow_if_not_in_progress(
 | 
						|
    manager: MockFlowManager,
 | 
						|
) -> None:
 | 
						|
    """Test abort raises UnknownFlow if the flow is not in progress."""
 | 
						|
    with pytest.raises(data_entry_flow.UnknownFlow):
 | 
						|
        manager.async_abort("wrong_flow_id")
 | 
						|
 | 
						|
 | 
						|
async def test_manager_abort_calls_async_flow_removed(manager: MockFlowManager) -> None:
 | 
						|
    """Test abort calling the async_flow_removed FlowManager method."""
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_show_form(step_id="init")
 | 
						|
 | 
						|
    manager.async_flow_removed = Mock()
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert result["step_id"] == "init"
 | 
						|
 | 
						|
    manager.async_flow_removed.assert_not_called()
 | 
						|
 | 
						|
    manager.async_abort(result["flow_id"])
 | 
						|
    manager.async_flow_removed.assert_called_once()
 | 
						|
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
    assert len(manager.mock_created_entries) == 0
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize(
 | 
						|
    ("menu_options", "sort", "expect_sort"),
 | 
						|
    [
 | 
						|
        (["target1", "target2"], None, None),
 | 
						|
        ({"target1": "Target 1", "target2": "Target 2"}, False, None),
 | 
						|
        (["target2", "target1"], True, True),
 | 
						|
    ],
 | 
						|
)
 | 
						|
async def test_show_menu(
 | 
						|
    hass: HomeAssistant,
 | 
						|
    manager: MockFlowManager,
 | 
						|
    menu_options: list[str] | dict[str, str],
 | 
						|
    sort: bool | None,
 | 
						|
    expect_sort: bool | None,
 | 
						|
) -> None:
 | 
						|
    """Test show menu."""
 | 
						|
    manager.hass = hass
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 5
 | 
						|
        data = None
 | 
						|
        task_one_done = False
 | 
						|
 | 
						|
        async def async_step_init(self, user_input=None):
 | 
						|
            return self.async_show_menu(
 | 
						|
                step_id="init",
 | 
						|
                menu_options=menu_options,
 | 
						|
                description_placeholders={"name": "Paulus"},
 | 
						|
                sort=sort,
 | 
						|
            )
 | 
						|
 | 
						|
        async def async_step_target1(self, user_input=None):
 | 
						|
            return self.async_show_form(step_id="target1")
 | 
						|
 | 
						|
        async def async_step_target2(self, user_input=None):
 | 
						|
            return self.async_show_form(step_id="target2")
 | 
						|
 | 
						|
    result = await manager.async_init("test")
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.MENU
 | 
						|
    assert result["menu_options"] == menu_options
 | 
						|
    assert result["description_placeholders"] == {"name": "Paulus"}
 | 
						|
    assert result.get("sort") == expect_sort
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.async_progress_by_handler("test")) == 1
 | 
						|
    assert manager.async_get(result["flow_id"])["handler"] == "test"
 | 
						|
 | 
						|
    # Mimic picking a step
 | 
						|
    result = await manager.async_configure(
 | 
						|
        result["flow_id"], {"next_step_id": "target1"}
 | 
						|
    )
 | 
						|
    assert result["type"] == data_entry_flow.FlowResultType.FORM
 | 
						|
    assert result["step_id"] == "target1"
 | 
						|
 | 
						|
 | 
						|
async def test_find_flows_by_init_data_type(manager: MockFlowManager) -> None:
 | 
						|
    """Test we can find flows by init data type."""
 | 
						|
 | 
						|
    @dataclasses.dataclass
 | 
						|
    class BluetoothDiscoveryData:
 | 
						|
        """Bluetooth Discovery data."""
 | 
						|
 | 
						|
        address: str
 | 
						|
 | 
						|
    @dataclasses.dataclass
 | 
						|
    class WiFiDiscoveryData:
 | 
						|
        """WiFi Discovery data."""
 | 
						|
 | 
						|
        address: str
 | 
						|
 | 
						|
    @manager.mock_reg_handler("test")
 | 
						|
    class TestFlow(data_entry_flow.FlowHandler):
 | 
						|
        VERSION = 1
 | 
						|
 | 
						|
        async def async_step_first(self, user_input=None):
 | 
						|
            if user_input is not None:
 | 
						|
                return await self.async_step_second()
 | 
						|
            return self.async_show_form(step_id="first", data_schema=vol.Schema([str]))
 | 
						|
 | 
						|
        async def async_step_second(self, user_input=None):
 | 
						|
            if user_input is not None:
 | 
						|
                return self.async_create_entry(
 | 
						|
                    title="Test Entry",
 | 
						|
                    data={"init": self.init_data, "user": user_input},
 | 
						|
                )
 | 
						|
            return self.async_show_form(step_id="second", data_schema=vol.Schema([str]))
 | 
						|
 | 
						|
    bluetooth_data = BluetoothDiscoveryData("aa:bb:cc:dd:ee:ff")
 | 
						|
    wifi_data = WiFiDiscoveryData("host")
 | 
						|
 | 
						|
    bluetooth_form = await manager.async_init(
 | 
						|
        "test", context={"init_step": "first"}, data=bluetooth_data
 | 
						|
    )
 | 
						|
    await manager.async_init("test", context={"init_step": "first"}, data=wifi_data)
 | 
						|
 | 
						|
    assert (
 | 
						|
        len(
 | 
						|
            manager.async_progress_by_init_data_type(
 | 
						|
                BluetoothDiscoveryData, lambda data: True
 | 
						|
            )
 | 
						|
        )
 | 
						|
    ) == 1
 | 
						|
    assert (
 | 
						|
        len(
 | 
						|
            manager.async_progress_by_init_data_type(
 | 
						|
                BluetoothDiscoveryData,
 | 
						|
                lambda data: bool(data.address == "aa:bb:cc:dd:ee:ff"),
 | 
						|
            )
 | 
						|
        )
 | 
						|
    ) == 1
 | 
						|
    assert (
 | 
						|
        len(
 | 
						|
            manager.async_progress_by_init_data_type(
 | 
						|
                BluetoothDiscoveryData, lambda data: bool(data.address == "not it")
 | 
						|
            )
 | 
						|
        )
 | 
						|
    ) == 0
 | 
						|
 | 
						|
    wifi_flows = manager.async_progress_by_init_data_type(
 | 
						|
        WiFiDiscoveryData, lambda data: True
 | 
						|
    )
 | 
						|
    assert len(wifi_flows) == 1
 | 
						|
 | 
						|
    bluetooth_result = await manager.async_configure(
 | 
						|
        bluetooth_form["flow_id"], ["SECOND-DATA"]
 | 
						|
    )
 | 
						|
    assert bluetooth_result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
 | 
						|
    assert len(manager.async_progress()) == 1
 | 
						|
    assert len(manager.mock_created_entries) == 1
 | 
						|
    result = manager.mock_created_entries[0]
 | 
						|
    assert result["handler"] == "test"
 | 
						|
    assert result["data"] == {"init": bluetooth_data, "user": ["SECOND-DATA"]}
 | 
						|
 | 
						|
    bluetooth_flows = manager.async_progress_by_init_data_type(
 | 
						|
        BluetoothDiscoveryData, lambda data: True
 | 
						|
    )
 | 
						|
    assert len(bluetooth_flows) == 0
 | 
						|
 | 
						|
    wifi_flows = manager.async_progress_by_init_data_type(
 | 
						|
        WiFiDiscoveryData, lambda data: True
 | 
						|
    )
 | 
						|
    assert len(wifi_flows) == 1
 | 
						|
 | 
						|
    manager.async_abort(wifi_flows[0]["flow_id"])
 | 
						|
 | 
						|
    wifi_flows = manager.async_progress_by_init_data_type(
 | 
						|
        WiFiDiscoveryData, lambda data: True
 | 
						|
    )
 | 
						|
    assert len(wifi_flows) == 0
 | 
						|
    assert len(manager.async_progress()) == 0
 | 
						|
 | 
						|
 | 
						|
def test_section_in_serializer() -> None:
 | 
						|
    """Test section with custom_serializer."""
 | 
						|
    assert cv.custom_serializer(
 | 
						|
        data_entry_flow.section(
 | 
						|
            vol.Schema(
 | 
						|
                {
 | 
						|
                    vol.Optional("option_1", default=False): bool,
 | 
						|
                    vol.Required("option_2"): int,
 | 
						|
                }
 | 
						|
            ),
 | 
						|
            {"collapsed": False},
 | 
						|
        )
 | 
						|
    ) == {
 | 
						|
        "expanded": True,
 | 
						|
        "schema": [
 | 
						|
            {
 | 
						|
                "default": False,
 | 
						|
                "name": "option_1",
 | 
						|
                "optional": True,
 | 
						|
                "required": False,
 | 
						|
                "type": "boolean",
 | 
						|
            },
 | 
						|
            {"name": "option_2", "required": True, "type": "integer"},
 | 
						|
        ],
 | 
						|
        "type": "expandable",
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
def test_nested_section_in_serializer() -> None:
 | 
						|
    """Test section with custom_serializer."""
 | 
						|
    with pytest.raises(
 | 
						|
        ValueError, match="Nesting expandable sections is not supported"
 | 
						|
    ):
 | 
						|
        cv.custom_serializer(
 | 
						|
            data_entry_flow.section(
 | 
						|
                vol.Schema(
 | 
						|
                    {
 | 
						|
                        vol.Required("section_1"): data_entry_flow.section(
 | 
						|
                            vol.Schema(
 | 
						|
                                {
 | 
						|
                                    vol.Optional("option_1", default=False): bool,
 | 
						|
                                    vol.Required("option_2"): int,
 | 
						|
                                }
 | 
						|
                            )
 | 
						|
                        )
 | 
						|
                    }
 | 
						|
                ),
 | 
						|
                {"collapsed": False},
 | 
						|
            )
 | 
						|
        )
 |