From 6b43f1f46f33897a836afdd8ccc8e148b0fefcd5 Mon Sep 17 00:00:00 2001 From: G Johansson Date: Mon, 21 Jul 2025 08:32:08 +0000 Subject: [PATCH] Mods --- homeassistant/components/workday/__init__.py | 236 +----------------- .../components/workday/binary_sensor.py | 6 +- homeassistant/components/workday/util.py | 231 ++++++++++++++++- 3 files changed, 241 insertions(+), 232 deletions(-) diff --git a/homeassistant/components/workday/__init__.py b/homeassistant/components/workday/__init__.py index d4771aa62d2..cbcf12cf31c 100644 --- a/homeassistant/components/workday/__init__.py +++ b/homeassistant/components/workday/__init__.py @@ -3,18 +3,14 @@ from __future__ import annotations from datetime import timedelta -from functools import partial from typing import cast -from holidays import PUBLIC, DateLike, HolidayBase, country_holidays +from holidays import DateLike, HolidayBase from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_COUNTRY, CONF_LANGUAGE from homeassistant.core import HomeAssistant -from homeassistant.exceptions import ConfigEntryError -from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue -from homeassistant.setup import SetupPhases, async_pause_setup -from homeassistant.util import dt as dt_util, slugify +from homeassistant.util import dt as dt_util from .const import ( CONF_ADD_HOLIDAYS, @@ -22,231 +18,19 @@ from .const import ( CONF_OFFSET, CONF_PROVINCE, CONF_REMOVE_HOLIDAYS, - DOMAIN, LOGGER, PLATFORMS, ) -from .util import validate_dates +from .util import ( + add_remove_custom_holidays, + async_validate_country_and_province, + get_holidays_object, + validate_dates, +) type WorkdayConfigEntry = ConfigEntry[HolidayBase] -async def _async_validate_country_and_province( - hass: HomeAssistant, - entry: WorkdayConfigEntry, - country: str | None, - province: str | None, -) -> None: - """Validate country and province.""" - - if not country: - return - try: - with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES): - # import executor job is used here because multiple integrations use - # the holidays library and it is not thread safe to import it in parallel - # https://github.com/python/cpython/issues/83065 - await hass.async_add_import_executor_job(country_holidays, country) - except NotImplementedError as ex: - async_create_issue( - hass, - DOMAIN, - "bad_country", - is_fixable=True, - is_persistent=False, - severity=IssueSeverity.ERROR, - translation_key="bad_country", - translation_placeholders={"title": entry.title}, - data={"entry_id": entry.entry_id, "country": None}, - ) - raise ConfigEntryError(f"Selected country {country} is not valid") from ex - - if not province: - return - try: - with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES): - # import executor job is used here because multiple integrations use - # the holidays library and it is not thread safe to import it in parallel - # https://github.com/python/cpython/issues/83065 - await hass.async_add_import_executor_job( - partial(country_holidays, country, subdiv=province) - ) - except NotImplementedError as ex: - async_create_issue( - hass, - DOMAIN, - "bad_province", - is_fixable=True, - is_persistent=False, - severity=IssueSeverity.ERROR, - translation_key="bad_province", - translation_placeholders={ - CONF_COUNTRY: country, - "title": entry.title, - }, - data={"entry_id": entry.entry_id, "country": country}, - ) - raise ConfigEntryError( - f"Selected province {province} for country {country} is not valid" - ) from ex - - -def _get_obj_holidays( - country: str | None, - province: str | None, - year: int, - language: str | None, - categories: list[str] | None, -) -> HolidayBase: - """Get the object for the requested country and year.""" - if not country: - return HolidayBase() - - set_categories = None - if categories: - category_list = [PUBLIC] - category_list.extend(categories) - set_categories = tuple(category_list) - - obj_holidays: HolidayBase = country_holidays( - country, - subdiv=province, - years=[year, year + 1], - language=language, - categories=set_categories, - ) - - supported_languages = obj_holidays.supported_languages - default_language = obj_holidays.default_language - - if default_language and not language: - # If no language is set, use the default language - LOGGER.debug("Changing language from None to %s", default_language) - return country_holidays( # Return default if no language - country, - subdiv=province, - years=year, - language=default_language, - categories=set_categories, - ) - - if ( - default_language - and language - and language not in supported_languages - and language.startswith("en") - ): - # If language does not match supported languages, use the first English variant - if default_language.startswith("en"): - LOGGER.debug("Changing language from %s to %s", language, default_language) - return country_holidays( # Return default English if default language - country, - subdiv=province, - years=year, - language=default_language, - categories=set_categories, - ) - for lang in supported_languages: - if lang.startswith("en"): - LOGGER.debug("Changing language from %s to %s", language, lang) - return country_holidays( - country, - subdiv=province, - years=year, - language=lang, - categories=set_categories, - ) - - if default_language and language and language not in supported_languages: - # If language does not match supported languages, use the default language - LOGGER.debug("Changing language from %s to %s", language, default_language) - return country_holidays( # Return default English if default language - country, - subdiv=province, - years=year, - language=default_language, - categories=set_categories, - ) - - return obj_holidays - - -def add_remove_custom_holidays( - hass: HomeAssistant, - entry: WorkdayConfigEntry, - country: str | None, - calc_add_holidays: list[DateLike], - calc_remove_holidays: list[str], -) -> None: - """Add or remove custom holidays.""" - next_year = dt_util.now().year + 1 - - # Add custom holidays - try: - entry.runtime_data.append(calc_add_holidays) - except ValueError as error: - LOGGER.error("Could not add custom holidays: %s", error) - - # Remove custom holidays - for remove_holiday in calc_remove_holidays: - try: - # is this formatted as a date? - if dt_util.parse_date(remove_holiday): - # remove holiday by date - removed = entry.runtime_data.pop(remove_holiday) - LOGGER.debug("Removed %s", remove_holiday) - else: - # remove holiday by name - LOGGER.debug("Treating '%s' as named holiday", remove_holiday) - removed = entry.runtime_data.pop_named(remove_holiday) - for holiday in removed: - LOGGER.debug("Removed %s by name '%s'", holiday, remove_holiday) - except KeyError as unmatched: - LOGGER.warning("No holiday found matching %s", unmatched) - if _date := dt_util.parse_date(remove_holiday): - if _date.year <= next_year: - # Only check and raise issues for max next year - async_create_issue( - hass, - DOMAIN, - f"bad_date_holiday-{entry.entry_id}-{slugify(remove_holiday)}", - is_fixable=True, - is_persistent=False, - severity=IssueSeverity.WARNING, - translation_key="bad_date_holiday", - translation_placeholders={ - CONF_COUNTRY: country if country else "-", - "title": entry.title, - CONF_REMOVE_HOLIDAYS: remove_holiday, - }, - data={ - "entry_id": entry.entry_id, - "country": country, - "named_holiday": remove_holiday, - }, - ) - else: - async_create_issue( - hass, - DOMAIN, - f"bad_named_holiday-{entry.entry_id}-{slugify(remove_holiday)}", - is_fixable=True, - is_persistent=False, - severity=IssueSeverity.WARNING, - translation_key="bad_named_holiday", - translation_placeholders={ - CONF_COUNTRY: country if country else "-", - "title": entry.title, - CONF_REMOVE_HOLIDAYS: remove_holiday, - }, - data={ - "entry_id": entry.entry_id, - "country": country, - "named_holiday": remove_holiday, - }, - ) - - async def async_setup_entry(hass: HomeAssistant, entry: WorkdayConfigEntry) -> bool: """Set up Workday from a config entry.""" @@ -263,10 +47,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: WorkdayConfigEntry) -> b province: str | None = entry.options.get(CONF_PROVINCE) year: int = (dt_util.now() + timedelta(days=days_offset)).year - await _async_validate_country_and_province(hass, entry, country, province) + await async_validate_country_and_province(hass, entry, country, province) entry.runtime_data = await hass.async_add_executor_job( - _get_obj_holidays, country, province, year, language, categories + get_holidays_object, country, province, year, language, categories ) add_remove_custom_holidays( diff --git a/homeassistant/components/workday/binary_sensor.py b/homeassistant/components/workday/binary_sensor.py index d8dcea5d4c9..dcda7b901a1 100644 --- a/homeassistant/components/workday/binary_sensor.py +++ b/homeassistant/components/workday/binary_sensor.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import date, datetime, timedelta -from typing import TYPE_CHECKING, Final +from typing import Final from holidays import HolidayBase, __version__ as python_holidays_version import voluptuous as vol @@ -26,11 +26,9 @@ from homeassistant.helpers.entity_platform import ( from homeassistant.helpers.event import async_track_point_in_utc_time from homeassistant.util import dt as dt_util +from . import WorkdayConfigEntry from .const import ALLOWED_DAYS, CONF_EXCLUDES, CONF_OFFSET, CONF_WORKDAYS, DOMAIN -if TYPE_CHECKING: - from . import WorkdayConfigEntry - SERVICE_CHECK_DATE: Final = "check_date" CHECK_DATE: Final = "check_date" diff --git a/homeassistant/components/workday/util.py b/homeassistant/components/workday/util.py index 6bcac21db67..726563febaf 100644 --- a/homeassistant/components/workday/util.py +++ b/homeassistant/components/workday/util.py @@ -1,10 +1,81 @@ """Helpers functions for the Workday component.""" from datetime import date, timedelta +from functools import partial +from typing import TYPE_CHECKING -from homeassistant.util import dt as dt_util +from holidays import PUBLIC, DateLike, HolidayBase, country_holidays -from .const import LOGGER +from homeassistant.const import CONF_COUNTRY +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import ConfigEntryError +from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue +from homeassistant.setup import SetupPhases, async_pause_setup +from homeassistant.util import dt as dt_util, slugify + +if TYPE_CHECKING: + from . import WorkdayConfigEntry +from .const import CONF_REMOVE_HOLIDAYS, DOMAIN, LOGGER + + +async def async_validate_country_and_province( + hass: HomeAssistant, + entry: "WorkdayConfigEntry", + country: str | None, + province: str | None, +) -> None: + """Validate country and province.""" + + if not country: + return + try: + with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES): + # import executor job is used here because multiple integrations use + # the holidays library and it is not thread safe to import it in parallel + # https://github.com/python/cpython/issues/83065 + await hass.async_add_import_executor_job(country_holidays, country) + except NotImplementedError as ex: + async_create_issue( + hass, + DOMAIN, + "bad_country", + is_fixable=True, + is_persistent=False, + severity=IssueSeverity.ERROR, + translation_key="bad_country", + translation_placeholders={"title": entry.title}, + data={"entry_id": entry.entry_id, "country": None}, + ) + raise ConfigEntryError(f"Selected country {country} is not valid") from ex + + if not province: + return + try: + with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES): + # import executor job is used here because multiple integrations use + # the holidays library and it is not thread safe to import it in parallel + # https://github.com/python/cpython/issues/83065 + await hass.async_add_import_executor_job( + partial(country_holidays, country, subdiv=province) + ) + except NotImplementedError as ex: + async_create_issue( + hass, + DOMAIN, + "bad_province", + is_fixable=True, + is_persistent=False, + severity=IssueSeverity.ERROR, + translation_key="bad_province", + translation_placeholders={ + CONF_COUNTRY: country, + "title": entry.title, + }, + data={"entry_id": entry.entry_id, "country": country}, + ) + raise ConfigEntryError( + f"Selected province {province} for country {country} is not valid" + ) from ex def validate_dates(holiday_list: list[str]) -> list[str]: @@ -25,3 +96,159 @@ def validate_dates(holiday_list: list[str]) -> list[str]: continue calc_holidays.append(add_date) return calc_holidays + + +def get_holidays_object( + country: str | None, + province: str | None, + year: int, + language: str | None, + categories: list[str] | None, +) -> HolidayBase: + """Get the object for the requested country and year.""" + if not country: + return HolidayBase() + + set_categories = None + if categories: + category_list = [PUBLIC] + category_list.extend(categories) + set_categories = tuple(category_list) + + obj_holidays: HolidayBase = country_holidays( + country, + subdiv=province, + years=[year, year + 1], + language=language, + categories=set_categories, + ) + + supported_languages = obj_holidays.supported_languages + default_language = obj_holidays.default_language + + if default_language and not language: + # If no language is set, use the default language + LOGGER.debug("Changing language from None to %s", default_language) + return country_holidays( # Return default if no language + country, + subdiv=province, + years=year, + language=default_language, + categories=set_categories, + ) + + if ( + default_language + and language + and language not in supported_languages + and language.startswith("en") + ): + # If language does not match supported languages, use the first English variant + if default_language.startswith("en"): + LOGGER.debug("Changing language from %s to %s", language, default_language) + return country_holidays( # Return default English if default language + country, + subdiv=province, + years=year, + language=default_language, + categories=set_categories, + ) + for lang in supported_languages: + if lang.startswith("en"): + LOGGER.debug("Changing language from %s to %s", language, lang) + return country_holidays( + country, + subdiv=province, + years=year, + language=lang, + categories=set_categories, + ) + + if default_language and language and language not in supported_languages: + # If language does not match supported languages, use the default language + LOGGER.debug("Changing language from %s to %s", language, default_language) + return country_holidays( # Return default English if default language + country, + subdiv=province, + years=year, + language=default_language, + categories=set_categories, + ) + + return obj_holidays + + +def add_remove_custom_holidays( + hass: HomeAssistant, + entry: "WorkdayConfigEntry", + country: str | None, + calc_add_holidays: list[DateLike], + calc_remove_holidays: list[str], +) -> None: + """Add or remove custom holidays.""" + next_year = dt_util.now().year + 1 + + # Add custom holidays + try: + entry.runtime_data.append(calc_add_holidays) + except ValueError as error: + LOGGER.error("Could not add custom holidays: %s", error) + + # Remove custom holidays + for remove_holiday in calc_remove_holidays: + try: + # is this formatted as a date? + if dt_util.parse_date(remove_holiday): + # remove holiday by date + removed = entry.runtime_data.pop(remove_holiday) + LOGGER.debug("Removed %s", remove_holiday) + else: + # remove holiday by name + LOGGER.debug("Treating '%s' as named holiday", remove_holiday) + removed = entry.runtime_data.pop_named(remove_holiday) + for holiday in removed: + LOGGER.debug("Removed %s by name '%s'", holiday, remove_holiday) + except KeyError as unmatched: + LOGGER.warning("No holiday found matching %s", unmatched) + if _date := dt_util.parse_date(remove_holiday): + if _date.year <= next_year: + # Only check and raise issues for max next year + async_create_issue( + hass, + DOMAIN, + f"bad_date_holiday-{entry.entry_id}-{slugify(remove_holiday)}", + is_fixable=True, + is_persistent=False, + severity=IssueSeverity.WARNING, + translation_key="bad_date_holiday", + translation_placeholders={ + CONF_COUNTRY: country if country else "-", + "title": entry.title, + CONF_REMOVE_HOLIDAYS: remove_holiday, + }, + data={ + "entry_id": entry.entry_id, + "country": country, + "named_holiday": remove_holiday, + }, + ) + else: + async_create_issue( + hass, + DOMAIN, + f"bad_named_holiday-{entry.entry_id}-{slugify(remove_holiday)}", + is_fixable=True, + is_persistent=False, + severity=IssueSeverity.WARNING, + translation_key="bad_named_holiday", + translation_placeholders={ + CONF_COUNTRY: country if country else "-", + "title": entry.title, + CONF_REMOVE_HOLIDAYS: remove_holiday, + }, + data={ + "entry_id": entry.entry_id, + "country": country, + "named_holiday": remove_holiday, + }, + )