"""The tests for the Script component."""
# pylint: disable=protected-access
from datetime import timedelta
import functools as ft
from unittest import mock

import asynctest
import jinja2
import pytest
import voluptuous as vol

# Otherwise can't test just this file (import order issue)
from homeassistant import exceptions
import homeassistant.components.scene as scene
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_ON
from homeassistant.core import Context, callback
from homeassistant.helpers import config_validation as cv, script
import homeassistant.util.dt as dt_util

from tests.common import async_fire_time_changed

ENTITY_ID = "script.test"


async def test_firing_event(hass):
    """Test the firing of events."""
    event = "test_event"
    context = Context()
    calls = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        calls.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass, cv.SCRIPT_SCHEMA({"event": event, "event_data": {"hello": "world"}})
    )

    await script_obj.async_run(context=context)

    await hass.async_block_till_done()

    assert len(calls) == 1
    assert calls[0].context is context
    assert calls[0].data.get("hello") == "world"
    assert not script_obj.can_cancel


async def test_firing_event_template(hass):
    """Test the firing of events."""
    event = "test_event"
    context = Context()
    calls = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        calls.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            {
                "event": event,
                "event_data_template": {
                    "dict": {
                        1: "{{ is_world }}",
                        2: "{{ is_world }}{{ is_world }}",
                        3: "{{ is_world }}{{ is_world }}{{ is_world }}",
                    },
                    "list": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"],
                },
            }
        ),
    )

    await script_obj.async_run({"is_world": "yes"}, context=context)

    await hass.async_block_till_done()

    assert len(calls) == 1
    assert calls[0].context is context
    assert calls[0].data == {
        "dict": {1: "yes", 2: "yesyes", 3: "yesyesyes"},
        "list": ["yes", "yesyes"],
    }
    assert not script_obj.can_cancel


async def test_calling_service(hass):
    """Test the calling of a service."""
    calls = []
    context = Context()

    @callback
    def record_call(service):
        """Add recorded event to set."""
        calls.append(service)

    hass.services.async_register("test", "script", record_call)

    hass.async_add_job(
        ft.partial(
            script.call_from_config,
            hass,
            {"service": "test.script", "data": {"hello": "world"}},
            context=context,
        )
    )

    await hass.async_block_till_done()

    assert len(calls) == 1
    assert calls[0].context is context
    assert calls[0].data.get("hello") == "world"


async def test_activating_scene(hass):
    """Test the activation of a scene."""
    calls = []
    context = Context()

    @callback
    def record_call(service):
        """Add recorded event to set."""
        calls.append(service)

    hass.services.async_register(scene.DOMAIN, SERVICE_TURN_ON, record_call)

    hass.async_add_job(
        ft.partial(
            script.call_from_config, hass, {"scene": "scene.hello"}, context=context
        )
    )

    await hass.async_block_till_done()

    assert len(calls) == 1
    assert calls[0].context is context
    assert calls[0].data.get(ATTR_ENTITY_ID) == "scene.hello"


async def test_calling_service_template(hass):
    """Test the calling of a service."""
    calls = []
    context = Context()

    @callback
    def record_call(service):
        """Add recorded event to set."""
        calls.append(service)

    hass.services.async_register("test", "script", record_call)

    hass.async_add_job(
        ft.partial(
            script.call_from_config,
            hass,
            {
                "service_template": """
                {% if True %}
                    test.script
                {% else %}
                    test.not_script
                {% endif %}""",
                "data_template": {
                    "hello": """
                    {% if is_world == 'yes' %}
                        world
                    {% else %}
                        not world
                    {% endif %}
                """
                },
            },
            {"is_world": "yes"},
            context=context,
        )
    )

    await hass.async_block_till_done()

    assert len(calls) == 1
    assert calls[0].context is context
    assert calls[0].data.get("hello") == "world"


async def test_delay(hass):
    """Test the delay."""
    event = "test_event"
    events = []
    context = Context()
    delay_alias = "delay step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"delay": {"seconds": 5}, "alias": delay_alias},
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run(context=context)
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == delay_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2
    assert events[0].context is context
    assert events[1].context is context


async def test_delay_template(hass):
    """Test the delay as a template."""
    event = "test_event"
    events = []
    delay_alias = "delay step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"delay": "00:00:{{ 5 }}", "alias": delay_alias},
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == delay_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2


async def test_delay_invalid_template(hass):
    """Test the delay as a template that fails."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"delay": "{{ invalid_delay }}"},
                {"delay": {"seconds": 5}},
                {"event": event},
            ]
        ),
    )

    with mock.patch.object(script, "_LOGGER") as mock_logger:
        await script_obj.async_run()
        await hass.async_block_till_done()
        assert mock_logger.error.called

    assert not script_obj.is_running
    assert len(events) == 1


async def test_delay_complex_template(hass):
    """Test the delay with a working complex template."""
    event = "test_event"
    events = []
    delay_alias = "delay step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"delay": {"seconds": "{{ 5 }}"}, "alias": delay_alias},
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == delay_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2


async def test_delay_complex_invalid_template(hass):
    """Test the delay with a complex template that fails."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"delay": {"seconds": "{{ invalid_delay }}"}},
                {"delay": {"seconds": "{{ 5 }}"}},
                {"event": event},
            ]
        ),
    )

    with mock.patch.object(script, "_LOGGER") as mock_logger:
        await script_obj.async_run()
        await hass.async_block_till_done()
        assert mock_logger.error.called

    assert not script_obj.is_running
    assert len(events) == 1


async def test_cancel_while_delay(hass):
    """Test the cancelling while the delay is present."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    script_obj = script.Script(
        hass, cv.SCRIPT_SCHEMA([{"delay": {"seconds": 5}}, {"event": event}])
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert len(events) == 0

    script_obj.async_stop()

    assert not script_obj.is_running

    # Make sure the script is really stopped.
    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 0


async def test_wait_template(hass):
    """Test the wait template."""
    event = "test_event"
    events = []
    context = Context()
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "wait_template": "{{states.switch.test.state == 'off'}}",
                    "alias": wait_alias,
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run(context=context)
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    hass.states.async_set("switch.test", "off")
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2
    assert events[0].context is context
    assert events[1].context is context


async def test_wait_template_cancel(hass):
    """Test the wait template cancel action."""
    event = "test_event"
    events = []
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "wait_template": "{{states.switch.test.state == 'off'}}",
                    "alias": wait_alias,
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    script_obj.async_stop()

    assert not script_obj.is_running
    assert len(events) == 1

    hass.states.async_set("switch.test", "off")
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 1


async def test_wait_template_not_schedule(hass):
    """Test the wait template with correct condition."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"wait_template": "{{states.switch.test.state == 'on'}}"},
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert script_obj.can_cancel
    assert len(events) == 2


async def test_wait_template_timeout_halt(hass):
    """Test the wait template, halt on timeout."""
    event = "test_event"
    events = []
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "wait_template": "{{states.switch.test.state == 'off'}}",
                    "continue_on_timeout": False,
                    "timeout": 5,
                    "alias": wait_alias,
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 1


async def test_wait_template_timeout_continue(hass):
    """Test the wait template with continuing the script."""
    event = "test_event"
    events = []
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "wait_template": "{{states.switch.test.state == 'off'}}",
                    "timeout": 5,
                    "continue_on_timeout": True,
                    "alias": wait_alias,
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2


async def test_wait_template_timeout_default(hass):
    """Test the wait template with default contiune."""
    event = "test_event"
    events = []
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "wait_template": "{{states.switch.test.state == 'off'}}",
                    "timeout": 5,
                    "alias": wait_alias,
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2


async def test_wait_template_variables(hass):
    """Test the wait template with variables."""
    event = "test_event"
    events = []
    wait_alias = "wait step"

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("switch.test", "on")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {"wait_template": "{{is_state(data, 'off')}}", "alias": wait_alias},
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run({"data": "switch.test"})
    await hass.async_block_till_done()

    assert script_obj.is_running
    assert script_obj.can_cancel
    assert script_obj.last_action == wait_alias
    assert len(events) == 1

    hass.states.async_set("switch.test", "off")
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(events) == 2


async def test_passing_variables_to_script(hass):
    """Test if we can pass variables to script."""
    calls = []

    @callback
    def record_call(service):
        """Add recorded event to set."""
        calls.append(service)

    hass.services.async_register("test", "script", record_call)

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {
                    "service": "test.script",
                    "data_template": {"hello": "{{ greeting }}"},
                },
                {"delay": "{{ delay_period }}"},
                {
                    "service": "test.script",
                    "data_template": {"hello": "{{ greeting2 }}"},
                },
            ]
        ),
    )

    await script_obj.async_run(
        {"greeting": "world", "greeting2": "universe", "delay_period": "00:00:05"}
    )

    await hass.async_block_till_done()

    assert script_obj.is_running
    assert len(calls) == 1
    assert calls[-1].data["hello"] == "world"

    future = dt_util.utcnow() + timedelta(seconds=5)
    async_fire_time_changed(hass, future)
    await hass.async_block_till_done()

    assert not script_obj.is_running
    assert len(calls) == 2
    assert calls[-1].data["hello"] == "universe"


async def test_condition(hass):
    """Test if we can use conditions in a script."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("test.entity", "hello")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "condition": "template",
                    "value_template": '{{ states.test.entity.state == "hello" }}',
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()
    assert len(events) == 2

    hass.states.async_set("test.entity", "goodbye")

    await script_obj.async_run()
    await hass.async_block_till_done()
    assert len(events) == 3


@asynctest.patch("homeassistant.helpers.script.condition.async_from_config")
async def test_condition_created_once(async_from_config, hass):
    """Test that the conditions do not get created multiple times."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("test.entity", "hello")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "condition": "template",
                    "value_template": '{{ states.test.entity.state == "hello" }}',
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await script_obj.async_run()
    await hass.async_block_till_done()
    assert async_from_config.call_count == 1
    assert len(script_obj._config_cache) == 1


async def test_all_conditions_cached(hass):
    """Test that multiple conditions get cached."""
    event = "test_event"
    events = []

    @callback
    def record_event(event):
        """Add recorded event to set."""
        events.append(event)

    hass.bus.async_listen(event, record_event)

    hass.states.async_set("test.entity", "hello")

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [
                {"event": event},
                {
                    "condition": "template",
                    "value_template": '{{ states.test.entity.state == "hello" }}',
                },
                {
                    "condition": "template",
                    "value_template": '{{ states.test.entity.state != "hello" }}',
                },
                {"event": event},
            ]
        ),
    )

    await script_obj.async_run()
    await hass.async_block_till_done()
    assert len(script_obj._config_cache) == 2


async def test_last_triggered(hass):
    """Test the last_triggered."""
    event = "test_event"

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [{"event": event}, {"delay": {"seconds": 5}}, {"event": event}]
        ),
    )

    assert script_obj.last_triggered is None

    time = dt_util.utcnow()
    with mock.patch("homeassistant.helpers.script.date_util.utcnow", return_value=time):
        await script_obj.async_run()
        await hass.async_block_till_done()

    assert script_obj.last_triggered == time


async def test_propagate_error_service_not_found(hass):
    """Test that a script aborts when a service is not found."""
    events = []

    @callback
    def record_event(event):
        events.append(event)

    hass.bus.async_listen("test_event", record_event)

    script_obj = script.Script(
        hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
    )

    with pytest.raises(exceptions.ServiceNotFound):
        await script_obj.async_run()

    assert len(events) == 0
    assert script_obj._cur == -1


async def test_propagate_error_invalid_service_data(hass):
    """Test that a script aborts when we send invalid service data."""
    events = []

    @callback
    def record_event(event):
        events.append(event)

    hass.bus.async_listen("test_event", record_event)

    calls = []

    @callback
    def record_call(service):
        """Add recorded event to set."""
        calls.append(service)

    hass.services.async_register(
        "test", "script", record_call, schema=vol.Schema({"text": str})
    )

    script_obj = script.Script(
        hass,
        cv.SCRIPT_SCHEMA(
            [{"service": "test.script", "data": {"text": 1}}, {"event": "test_event"}]
        ),
    )

    with pytest.raises(vol.Invalid):
        await script_obj.async_run()

    assert len(events) == 0
    assert len(calls) == 0
    assert script_obj._cur == -1


async def test_propagate_error_service_exception(hass):
    """Test that a script aborts when a service throws an exception."""
    events = []

    @callback
    def record_event(event):
        events.append(event)

    hass.bus.async_listen("test_event", record_event)

    calls = []

    @callback
    def record_call(service):
        """Add recorded event to set."""
        raise ValueError("BROKEN")

    hass.services.async_register("test", "script", record_call)

    script_obj = script.Script(
        hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
    )

    with pytest.raises(ValueError):
        await script_obj.async_run()

    assert len(events) == 0
    assert len(calls) == 0
    assert script_obj._cur == -1


def test_log_exception():
    """Test logged output."""
    script_obj = script.Script(
        None, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
    )
    script_obj._exception_step = 1

    for exc, msg in (
        (vol.Invalid("Invalid number"), "Invalid data"),
        (
            exceptions.TemplateError(jinja2.TemplateError("Unclosed bracket")),
            "Error rendering template",
        ),
        (exceptions.Unauthorized(), "Unauthorized"),
        (exceptions.ServiceNotFound("light", "turn_on"), "Service not found"),
        (ValueError("Cannot parse JSON"), "Unknown error"),
    ):
        logger = mock.Mock()
        script_obj.async_log_exception(logger, "Test error", exc)

        assert len(logger.mock_calls) == 1
        _, _, p_error_desc, p_action_type, p_step, p_error = logger.mock_calls[0][1]

        assert p_error_desc == msg
        assert p_action_type == script.ACTION_FIRE_EVENT
        assert p_step == 2
        if isinstance(exc, ValueError):
            assert p_error == ""
        else:
            assert p_error == str(exc)


async def test_referenced_entities():
    """Test referenced entities."""
    script_obj = script.Script(
        None,
        cv.SCRIPT_SCHEMA(
            [
                {
                    "service": "test.script",
                    "data": {"entity_id": "light.service_not_list"},
                },
                {
                    "service": "test.script",
                    "data": {"entity_id": ["light.service_list"]},
                },
                {
                    "condition": "state",
                    "entity_id": "sensor.condition",
                    "state": "100",
                },
                {"service": "test.script", "data": {"without": "entity_id"}},
                {"scene": "scene.hello"},
                {"event": "test_event"},
                {"delay": "{{ delay_period }}"},
            ]
        ),
    )
    assert script_obj.referenced_entities == {
        "light.service_not_list",
        "light.service_list",
        "sensor.condition",
        "scene.hello",
    }
    # Test we cache results.
    assert script_obj.referenced_entities is script_obj.referenced_entities


async def test_referenced_devices():
    """Test referenced entities."""
    script_obj = script.Script(
        None,
        cv.SCRIPT_SCHEMA(
            [
                {"domain": "light", "device_id": "script-dev-id"},
                {
                    "condition": "device",
                    "device_id": "condition-dev-id",
                    "domain": "switch",
                },
            ]
        ),
    )
    assert script_obj.referenced_devices == {"script-dev-id", "condition-dev-id"}
    # Test we cache results.
    assert script_obj.referenced_devices is script_obj.referenced_devices