This commit is contained in:
G Johansson 2025-07-21 08:32:08 +00:00
parent be04e4b251
commit 6b43f1f46f
3 changed files with 241 additions and 232 deletions

View File

@ -3,18 +3,14 @@
from __future__ import annotations
from datetime import timedelta
from functools import partial
from typing import cast
from holidays import PUBLIC, DateLike, HolidayBase, country_holidays
from holidays import DateLike, HolidayBase
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_COUNTRY, CONF_LANGUAGE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.setup import SetupPhases, async_pause_setup
from homeassistant.util import dt as dt_util, slugify
from homeassistant.util import dt as dt_util
from .const import (
CONF_ADD_HOLIDAYS,
@ -22,231 +18,19 @@ from .const import (
CONF_OFFSET,
CONF_PROVINCE,
CONF_REMOVE_HOLIDAYS,
DOMAIN,
LOGGER,
PLATFORMS,
)
from .util import validate_dates
from .util import (
add_remove_custom_holidays,
async_validate_country_and_province,
get_holidays_object,
validate_dates,
)
type WorkdayConfigEntry = ConfigEntry[HolidayBase]
async def _async_validate_country_and_province(
hass: HomeAssistant,
entry: WorkdayConfigEntry,
country: str | None,
province: str | None,
) -> None:
"""Validate country and province."""
if not country:
return
try:
with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
# import executor job is used here because multiple integrations use
# the holidays library and it is not thread safe to import it in parallel
# https://github.com/python/cpython/issues/83065
await hass.async_add_import_executor_job(country_holidays, country)
except NotImplementedError as ex:
async_create_issue(
hass,
DOMAIN,
"bad_country",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.ERROR,
translation_key="bad_country",
translation_placeholders={"title": entry.title},
data={"entry_id": entry.entry_id, "country": None},
)
raise ConfigEntryError(f"Selected country {country} is not valid") from ex
if not province:
return
try:
with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
# import executor job is used here because multiple integrations use
# the holidays library and it is not thread safe to import it in parallel
# https://github.com/python/cpython/issues/83065
await hass.async_add_import_executor_job(
partial(country_holidays, country, subdiv=province)
)
except NotImplementedError as ex:
async_create_issue(
hass,
DOMAIN,
"bad_province",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.ERROR,
translation_key="bad_province",
translation_placeholders={
CONF_COUNTRY: country,
"title": entry.title,
},
data={"entry_id": entry.entry_id, "country": country},
)
raise ConfigEntryError(
f"Selected province {province} for country {country} is not valid"
) from ex
def _get_obj_holidays(
country: str | None,
province: str | None,
year: int,
language: str | None,
categories: list[str] | None,
) -> HolidayBase:
"""Get the object for the requested country and year."""
if not country:
return HolidayBase()
set_categories = None
if categories:
category_list = [PUBLIC]
category_list.extend(categories)
set_categories = tuple(category_list)
obj_holidays: HolidayBase = country_holidays(
country,
subdiv=province,
years=[year, year + 1],
language=language,
categories=set_categories,
)
supported_languages = obj_holidays.supported_languages
default_language = obj_holidays.default_language
if default_language and not language:
# If no language is set, use the default language
LOGGER.debug("Changing language from None to %s", default_language)
return country_holidays( # Return default if no language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
if (
default_language
and language
and language not in supported_languages
and language.startswith("en")
):
# If language does not match supported languages, use the first English variant
if default_language.startswith("en"):
LOGGER.debug("Changing language from %s to %s", language, default_language)
return country_holidays( # Return default English if default language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
for lang in supported_languages:
if lang.startswith("en"):
LOGGER.debug("Changing language from %s to %s", language, lang)
return country_holidays(
country,
subdiv=province,
years=year,
language=lang,
categories=set_categories,
)
if default_language and language and language not in supported_languages:
# If language does not match supported languages, use the default language
LOGGER.debug("Changing language from %s to %s", language, default_language)
return country_holidays( # Return default English if default language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
return obj_holidays
def add_remove_custom_holidays(
hass: HomeAssistant,
entry: WorkdayConfigEntry,
country: str | None,
calc_add_holidays: list[DateLike],
calc_remove_holidays: list[str],
) -> None:
"""Add or remove custom holidays."""
next_year = dt_util.now().year + 1
# Add custom holidays
try:
entry.runtime_data.append(calc_add_holidays)
except ValueError as error:
LOGGER.error("Could not add custom holidays: %s", error)
# Remove custom holidays
for remove_holiday in calc_remove_holidays:
try:
# is this formatted as a date?
if dt_util.parse_date(remove_holiday):
# remove holiday by date
removed = entry.runtime_data.pop(remove_holiday)
LOGGER.debug("Removed %s", remove_holiday)
else:
# remove holiday by name
LOGGER.debug("Treating '%s' as named holiday", remove_holiday)
removed = entry.runtime_data.pop_named(remove_holiday)
for holiday in removed:
LOGGER.debug("Removed %s by name '%s'", holiday, remove_holiday)
except KeyError as unmatched:
LOGGER.warning("No holiday found matching %s", unmatched)
if _date := dt_util.parse_date(remove_holiday):
if _date.year <= next_year:
# Only check and raise issues for max next year
async_create_issue(
hass,
DOMAIN,
f"bad_date_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.WARNING,
translation_key="bad_date_holiday",
translation_placeholders={
CONF_COUNTRY: country if country else "-",
"title": entry.title,
CONF_REMOVE_HOLIDAYS: remove_holiday,
},
data={
"entry_id": entry.entry_id,
"country": country,
"named_holiday": remove_holiday,
},
)
else:
async_create_issue(
hass,
DOMAIN,
f"bad_named_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.WARNING,
translation_key="bad_named_holiday",
translation_placeholders={
CONF_COUNTRY: country if country else "-",
"title": entry.title,
CONF_REMOVE_HOLIDAYS: remove_holiday,
},
data={
"entry_id": entry.entry_id,
"country": country,
"named_holiday": remove_holiday,
},
)
async def async_setup_entry(hass: HomeAssistant, entry: WorkdayConfigEntry) -> bool:
"""Set up Workday from a config entry."""
@ -263,10 +47,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: WorkdayConfigEntry) -> b
province: str | None = entry.options.get(CONF_PROVINCE)
year: int = (dt_util.now() + timedelta(days=days_offset)).year
await _async_validate_country_and_province(hass, entry, country, province)
await async_validate_country_and_province(hass, entry, country, province)
entry.runtime_data = await hass.async_add_executor_job(
_get_obj_holidays, country, province, year, language, categories
get_holidays_object, country, province, year, language, categories
)
add_remove_custom_holidays(

View File

@ -3,7 +3,7 @@
from __future__ import annotations
from datetime import date, datetime, timedelta
from typing import TYPE_CHECKING, Final
from typing import Final
from holidays import HolidayBase, __version__ as python_holidays_version
import voluptuous as vol
@ -26,11 +26,9 @@ from homeassistant.helpers.entity_platform import (
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util
from . import WorkdayConfigEntry
from .const import ALLOWED_DAYS, CONF_EXCLUDES, CONF_OFFSET, CONF_WORKDAYS, DOMAIN
if TYPE_CHECKING:
from . import WorkdayConfigEntry
SERVICE_CHECK_DATE: Final = "check_date"
CHECK_DATE: Final = "check_date"

View File

@ -1,10 +1,81 @@
"""Helpers functions for the Workday component."""
from datetime import date, timedelta
from functools import partial
from typing import TYPE_CHECKING
from homeassistant.util import dt as dt_util
from holidays import PUBLIC, DateLike, HolidayBase, country_holidays
from .const import LOGGER
from homeassistant.const import CONF_COUNTRY
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.setup import SetupPhases, async_pause_setup
from homeassistant.util import dt as dt_util, slugify
if TYPE_CHECKING:
from . import WorkdayConfigEntry
from .const import CONF_REMOVE_HOLIDAYS, DOMAIN, LOGGER
async def async_validate_country_and_province(
hass: HomeAssistant,
entry: "WorkdayConfigEntry",
country: str | None,
province: str | None,
) -> None:
"""Validate country and province."""
if not country:
return
try:
with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
# import executor job is used here because multiple integrations use
# the holidays library and it is not thread safe to import it in parallel
# https://github.com/python/cpython/issues/83065
await hass.async_add_import_executor_job(country_holidays, country)
except NotImplementedError as ex:
async_create_issue(
hass,
DOMAIN,
"bad_country",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.ERROR,
translation_key="bad_country",
translation_placeholders={"title": entry.title},
data={"entry_id": entry.entry_id, "country": None},
)
raise ConfigEntryError(f"Selected country {country} is not valid") from ex
if not province:
return
try:
with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
# import executor job is used here because multiple integrations use
# the holidays library and it is not thread safe to import it in parallel
# https://github.com/python/cpython/issues/83065
await hass.async_add_import_executor_job(
partial(country_holidays, country, subdiv=province)
)
except NotImplementedError as ex:
async_create_issue(
hass,
DOMAIN,
"bad_province",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.ERROR,
translation_key="bad_province",
translation_placeholders={
CONF_COUNTRY: country,
"title": entry.title,
},
data={"entry_id": entry.entry_id, "country": country},
)
raise ConfigEntryError(
f"Selected province {province} for country {country} is not valid"
) from ex
def validate_dates(holiday_list: list[str]) -> list[str]:
@ -25,3 +96,159 @@ def validate_dates(holiday_list: list[str]) -> list[str]:
continue
calc_holidays.append(add_date)
return calc_holidays
def get_holidays_object(
country: str | None,
province: str | None,
year: int,
language: str | None,
categories: list[str] | None,
) -> HolidayBase:
"""Get the object for the requested country and year."""
if not country:
return HolidayBase()
set_categories = None
if categories:
category_list = [PUBLIC]
category_list.extend(categories)
set_categories = tuple(category_list)
obj_holidays: HolidayBase = country_holidays(
country,
subdiv=province,
years=[year, year + 1],
language=language,
categories=set_categories,
)
supported_languages = obj_holidays.supported_languages
default_language = obj_holidays.default_language
if default_language and not language:
# If no language is set, use the default language
LOGGER.debug("Changing language from None to %s", default_language)
return country_holidays( # Return default if no language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
if (
default_language
and language
and language not in supported_languages
and language.startswith("en")
):
# If language does not match supported languages, use the first English variant
if default_language.startswith("en"):
LOGGER.debug("Changing language from %s to %s", language, default_language)
return country_holidays( # Return default English if default language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
for lang in supported_languages:
if lang.startswith("en"):
LOGGER.debug("Changing language from %s to %s", language, lang)
return country_holidays(
country,
subdiv=province,
years=year,
language=lang,
categories=set_categories,
)
if default_language and language and language not in supported_languages:
# If language does not match supported languages, use the default language
LOGGER.debug("Changing language from %s to %s", language, default_language)
return country_holidays( # Return default English if default language
country,
subdiv=province,
years=year,
language=default_language,
categories=set_categories,
)
return obj_holidays
def add_remove_custom_holidays(
hass: HomeAssistant,
entry: "WorkdayConfigEntry",
country: str | None,
calc_add_holidays: list[DateLike],
calc_remove_holidays: list[str],
) -> None:
"""Add or remove custom holidays."""
next_year = dt_util.now().year + 1
# Add custom holidays
try:
entry.runtime_data.append(calc_add_holidays)
except ValueError as error:
LOGGER.error("Could not add custom holidays: %s", error)
# Remove custom holidays
for remove_holiday in calc_remove_holidays:
try:
# is this formatted as a date?
if dt_util.parse_date(remove_holiday):
# remove holiday by date
removed = entry.runtime_data.pop(remove_holiday)
LOGGER.debug("Removed %s", remove_holiday)
else:
# remove holiday by name
LOGGER.debug("Treating '%s' as named holiday", remove_holiday)
removed = entry.runtime_data.pop_named(remove_holiday)
for holiday in removed:
LOGGER.debug("Removed %s by name '%s'", holiday, remove_holiday)
except KeyError as unmatched:
LOGGER.warning("No holiday found matching %s", unmatched)
if _date := dt_util.parse_date(remove_holiday):
if _date.year <= next_year:
# Only check and raise issues for max next year
async_create_issue(
hass,
DOMAIN,
f"bad_date_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.WARNING,
translation_key="bad_date_holiday",
translation_placeholders={
CONF_COUNTRY: country if country else "-",
"title": entry.title,
CONF_REMOVE_HOLIDAYS: remove_holiday,
},
data={
"entry_id": entry.entry_id,
"country": country,
"named_holiday": remove_holiday,
},
)
else:
async_create_issue(
hass,
DOMAIN,
f"bad_named_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
is_fixable=True,
is_persistent=False,
severity=IssueSeverity.WARNING,
translation_key="bad_named_holiday",
translation_placeholders={
CONF_COUNTRY: country if country else "-",
"title": entry.title,
CONF_REMOVE_HOLIDAYS: remove_holiday,
},
data={
"entry_id": entry.entry_id,
"country": country,
"named_holiday": remove_holiday,
},
)