"""Translation string lookup helpers."""

from __future__ import annotations

import asyncio
from collections.abc import Iterable, Mapping
from contextlib import suppress
from dataclasses import dataclass
import logging
import pathlib
import string
from typing import Any

from homeassistant.const import (
    EVENT_CORE_CONFIG_UPDATE,
    STATE_UNAVAILABLE,
    STATE_UNKNOWN,
)
from homeassistant.core import Event, HomeAssistant, async_get_hass, callback
from homeassistant.loader import (
    Integration,
    async_get_config_flows,
    async_get_integrations,
    bind_hass,
)
from homeassistant.util.json import load_json

from . import singleton

_LOGGER = logging.getLogger(__name__)

TRANSLATION_FLATTEN_CACHE = "translation_flatten_cache"
LOCALE_EN = "en"


def recursive_flatten(
    prefix: str, data: dict[str, dict[str, Any] | str]
) -> dict[str, str]:
    """Return a flattened representation of dict data."""
    output: dict[str, str] = {}
    for key, value in data.items():
        if isinstance(value, dict):
            output.update(recursive_flatten(f"{prefix}{key}.", value))
        else:
            output[f"{prefix}{key}"] = value
    return output


def _load_translations_files_by_language(
    translation_files: dict[str, dict[str, pathlib.Path]],
) -> dict[str, dict[str, Any]]:
    """Load and parse translation.json files."""
    loaded: dict[str, dict[str, Any]] = {}
    for language, component_translation_file in translation_files.items():
        loaded_for_language: dict[str, Any] = {}
        loaded[language] = loaded_for_language

        for component, translation_file in component_translation_file.items():
            loaded_json = load_json(translation_file)

            if not isinstance(loaded_json, dict):
                _LOGGER.warning(
                    "Translation file is unexpected type %s. Expected dict for %s",
                    type(loaded_json),
                    translation_file,
                )
                continue

            loaded_for_language[component] = loaded_json

    return loaded


def build_resources(
    translation_strings: dict[str, dict[str, dict[str, Any] | str]],
    components: set[str],
    category: str,
) -> dict[str, dict[str, Any] | str]:
    """Build the resources response for the given components."""
    # Build response
    return {
        component: category_strings
        for component in components
        if (component_strings := translation_strings.get(component))
        and (category_strings := component_strings.get(category))
    }


async def _async_get_component_strings(
    hass: HomeAssistant,
    languages: Iterable[str],
    components: set[str],
    integrations: dict[str, Integration],
) -> dict[str, dict[str, Any]]:
    """Load translations."""
    translations_by_language: dict[str, dict[str, Any]] = {}
    # Determine paths of missing components/platforms
    files_to_load_by_language: dict[str, dict[str, pathlib.Path]] = {}
    loaded_translations_by_language: dict[str, dict[str, Any]] = {}
    has_files_to_load = False
    for language in languages:
        file_name = f"{language}.json"
        files_to_load: dict[str, pathlib.Path] = {
            domain: integration.file_path / "translations" / file_name
            for domain in components
            if (
                (integration := integrations.get(domain))
                and integration.has_translations
            )
        }
        files_to_load_by_language[language] = files_to_load
        has_files_to_load |= bool(files_to_load)

    if has_files_to_load:
        loaded_translations_by_language = await hass.async_add_executor_job(
            _load_translations_files_by_language, files_to_load_by_language
        )

    for language in languages:
        loaded_translations = loaded_translations_by_language.setdefault(language, {})
        for domain in components:
            # Translations that miss "title" will get integration put in.
            component_translations = loaded_translations.setdefault(domain, {})
            if "title" not in component_translations and (
                integration := integrations.get(domain)
            ):
                component_translations["title"] = integration.name

        translations_by_language.setdefault(language, {}).update(loaded_translations)

    return translations_by_language


@dataclass(slots=True)
class _TranslationsCacheData:
    """Data for the translation cache.

    This class contains data that is designed to be shared
    between multiple instances of the translation cache so
    we only have to load the data once.
    """

    loaded: dict[str, set[str]]
    cache: dict[str, dict[str, dict[str, dict[str, str]]]]


class _TranslationCache:
    """Cache for flattened translations."""

    __slots__ = ("cache_data", "hass", "lock")

    def __init__(self, hass: HomeAssistant) -> None:
        """Initialize the cache."""
        self.hass = hass
        self.cache_data = _TranslationsCacheData({}, {})
        self.lock = asyncio.Lock()

    @callback
    def async_is_loaded(self, language: str, components: set[str]) -> bool:
        """Return if the given components are loaded for the language."""
        return components.issubset(self.cache_data.loaded.get(language, set()))

    async def async_load(
        self,
        language: str,
        components: set[str],
    ) -> None:
        """Load resources into the cache."""
        loaded = self.cache_data.loaded.setdefault(language, set())
        if components_to_load := components - loaded:
            # Translations are never unloaded so if there are no components to load
            # we can skip the lock which reduces contention when multiple different
            # translations categories are being fetched at the same time which is
            # common from the frontend.
            async with self.lock:
                # Check components to load again, as another task might have loaded
                # them while we were waiting for the lock.
                if components_to_load := components - loaded:
                    await self._async_load(language, components_to_load)

    async def async_fetch(
        self,
        language: str,
        category: str,
        components: set[str],
    ) -> dict[str, str]:
        """Load resources into the cache and return them."""
        await self.async_load(language, components)

        return self.get_cached(language, category, components)

    def get_cached(
        self,
        language: str,
        category: str,
        components: set[str],
    ) -> dict[str, str]:
        """Read resources from the cache."""
        category_cache = self.cache_data.cache.get(language, {}).get(category, {})
        # If only one component was requested, return it directly
        # to avoid merging the dictionaries and keeping additional
        # copies of the same data in memory.
        if len(components) == 1 and (component := next(iter(components))):
            return category_cache.get(component, {})

        result: dict[str, str] = {}
        for component in components.intersection(category_cache):
            result.update(category_cache[component])
        return result

    async def _async_load(self, language: str, components: set[str]) -> None:
        """Populate the cache for a given set of components."""
        loaded = self.cache_data.loaded
        _LOGGER.debug(
            "Cache miss for %s: %s",
            language,
            components,
        )
        # Fetch the English resources, as a fallback for missing keys
        languages = [LOCALE_EN] if language == LOCALE_EN else [LOCALE_EN, language]

        integrations: dict[str, Integration] = {}
        ints_or_excs = await async_get_integrations(self.hass, components)
        for domain, int_or_exc in ints_or_excs.items():
            if isinstance(int_or_exc, Exception):
                _LOGGER.warning(
                    "Failed to load integration for translation: %s", int_or_exc
                )
                continue
            integrations[domain] = int_or_exc

        translation_by_language_strings = await _async_get_component_strings(
            self.hass, languages, components, integrations
        )

        # English is always the fallback language so we load them first
        self._build_category_cache(
            language, components, translation_by_language_strings[LOCALE_EN]
        )

        if language != LOCALE_EN:
            # Now overlay the requested language on top of the English
            self._build_category_cache(
                language, components, translation_by_language_strings[language]
            )

            loaded_english_components = loaded.setdefault(LOCALE_EN, set())
            # Since we just loaded english anyway we can avoid loading
            # again if they switch back to english.
            if loaded_english_components.isdisjoint(components):
                self._build_category_cache(
                    LOCALE_EN, components, translation_by_language_strings[LOCALE_EN]
                )
                loaded_english_components.update(components)

        loaded[language].update(components)

    def _validate_placeholders(
        self,
        language: str,
        updated_resources: dict[str, str],
        cached_resources: dict[str, str] | None = None,
    ) -> dict[str, str]:
        """Validate if updated resources have same placeholders as cached resources."""
        if cached_resources is None:
            return updated_resources

        mismatches: set[str] = set()

        for key, value in updated_resources.items():
            if key not in cached_resources:
                continue
            try:
                tuples = list(string.Formatter().parse(value))
            except ValueError:
                _LOGGER.error(
                    ("Error while parsing localized (%s) string %s"), language, key
                )
                continue
            updated_placeholders = {tup[1] for tup in tuples if tup[1] is not None}

            tuples = list(string.Formatter().parse(cached_resources[key]))
            cached_placeholders = {tup[1] for tup in tuples if tup[1] is not None}
            if updated_placeholders != cached_placeholders:
                _LOGGER.error(
                    (
                        "Validation of translation placeholders for localized (%s) string "
                        "%s failed: (%s != %s)"
                    ),
                    language,
                    key,
                    updated_placeholders,
                    cached_placeholders,
                )
                mismatches.add(key)

        for mismatch in mismatches:
            del updated_resources[mismatch]

        return updated_resources

    @callback
    def _build_category_cache(
        self,
        language: str,
        components: set[str],
        translation_strings: dict[str, dict[str, Any]],
    ) -> None:
        """Extract resources into the cache."""
        resource: dict[str, Any] | str
        cached = self.cache_data.cache.setdefault(language, {})
        categories = {
            category
            for component in translation_strings.values()
            for category in component
        }

        for category in categories:
            new_resources = build_resources(translation_strings, components, category)
            category_cache = cached.setdefault(category, {})

            for component, resource in new_resources.items():
                component_cache = category_cache.setdefault(component, {})

                if not isinstance(resource, dict):
                    component_cache[f"component.{component}.{category}"] = resource
                    continue

                prefix = f"component.{component}.{category}."
                flat = recursive_flatten(prefix, resource)
                flat = self._validate_placeholders(language, flat, component_cache)
                component_cache.update(flat)


@bind_hass
async def async_get_translations(
    hass: HomeAssistant,
    language: str,
    category: str,
    integrations: Iterable[str] | None = None,
    config_flow: bool | None = None,
) -> dict[str, str]:
    """Return all backend translations.

    If integration is specified, load it for that one.
    Otherwise, default to loaded integrations combined with config flow
    integrations if config_flow is true.
    """
    if integrations is None and config_flow:
        components = (await async_get_config_flows(hass)) - hass.config.components
    elif integrations is not None:
        components = set(integrations)
    else:
        components = hass.config.top_level_components

    return await _async_get_translations_cache(hass).async_fetch(
        language, category, components
    )


@callback
def async_get_cached_translations(
    hass: HomeAssistant,
    language: str,
    category: str,
    integration: str | None = None,
) -> dict[str, str]:
    """Return all cached backend translations.

    If integration is specified, return translations for it.
    Otherwise, default to all loaded integrations.
    """
    components = {integration} if integration else hass.config.top_level_components
    return _async_get_translations_cache(hass).get_cached(
        language, category, components
    )


@singleton.singleton(TRANSLATION_FLATTEN_CACHE)
def _async_get_translations_cache(hass: HomeAssistant) -> _TranslationCache:
    """Return the translation cache."""
    return _TranslationCache(hass)


@callback
def async_setup(hass: HomeAssistant) -> None:
    """Create translation cache and register listeners for translation loaders.

    Listeners load translations for every loaded component and after config change.
    """
    cache = _TranslationCache(hass)
    current_language = hass.config.language
    _async_get_translations_cache(hass)

    @callback
    def _async_load_translations_filter(event_data: Mapping[str, Any]) -> bool:
        """Filter out unwanted events."""
        nonlocal current_language
        if (
            new_language := event_data.get("language")
        ) and new_language != current_language:
            current_language = new_language
            return True
        return False

    async def _async_load_translations(event: Event) -> None:
        new_language = event.data["language"]
        _LOGGER.debug("Loading translations for language: %s", new_language)
        await cache.async_load(new_language, hass.config.components)

    hass.bus.async_listen(
        EVENT_CORE_CONFIG_UPDATE,
        _async_load_translations,
        event_filter=_async_load_translations_filter,
    )


async def async_load_integrations(hass: HomeAssistant, integrations: set[str]) -> None:
    """Load translations for integrations."""
    await _async_get_translations_cache(hass).async_load(
        hass.config.language, integrations
    )


@callback
def async_translations_loaded(hass: HomeAssistant, components: set[str]) -> bool:
    """Return if the given components are loaded for the language."""
    return _async_get_translations_cache(hass).async_is_loaded(
        hass.config.language, components
    )


@callback
def async_get_exception_message(
    translation_domain: str,
    translation_key: str,
    translation_placeholders: dict[str, str] | None = None,
) -> str:
    """Return a translated exception message.

    Defaults to English, requires translations to already be cached.
    """
    language = "en"
    hass = async_get_hass()
    localize_key = (
        f"component.{translation_domain}.exceptions.{translation_key}.message"
    )
    translations = async_get_cached_translations(hass, language, "exceptions")
    if localize_key in translations:
        if message := translations[localize_key]:
            message = message.rstrip(".")
        if not translation_placeholders:
            return message
        with suppress(KeyError):
            message = message.format(**translation_placeholders)
        return message

    # We return the translation key when was not found in the cache
    return translation_key


@callback
def async_translate_state(
    hass: HomeAssistant,
    state: str,
    domain: str,
    platform: str | None,
    translation_key: str | None,
    device_class: str | None,
) -> str:
    """Translate provided state using cached translations for currently selected language."""
    if state in [STATE_UNAVAILABLE, STATE_UNKNOWN]:
        return state
    language = hass.config.language
    if platform is not None and translation_key is not None:
        localize_key = (
            f"component.{platform}.entity.{domain}.{translation_key}.state.{state}"
        )
        translations = async_get_cached_translations(hass, language, "entity")
        if localize_key in translations:
            return translations[localize_key]

    translations = async_get_cached_translations(hass, language, "entity_component")
    if device_class is not None:
        localize_key = (
            f"component.{domain}.entity_component.{device_class}.state.{state}"
        )
        if localize_key in translations:
            return translations[localize_key]
    localize_key = f"component.{domain}.entity_component._.state.{state}"
    if localize_key in translations:
        return translations[localize_key]

    return state