diff --git a/homeassistant/components/habitica/services.py b/homeassistant/components/habitica/services.py index c5207ae4ec0..38833f26932 100644 --- a/homeassistant/components/habitica/services.py +++ b/homeassistant/components/habitica/services.py @@ -250,56 +250,203 @@ def get_config_entry(hass: HomeAssistant, entry_id: str) -> HabiticaConfigEntry: return entry -@callback -def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 - """Set up services for Habitica integration.""" +async def _cast_skill(call: ServiceCall) -> ServiceResponse: + """Skill action.""" + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data - async def cast_skill(call: ServiceCall) -> ServiceResponse: - """Skill action.""" - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data + skill = SKILL_MAP[call.data[ATTR_SKILL]] + cost = COST_MAP[call.data[ATTR_SKILL]] - skill = SKILL_MAP[call.data[ATTR_SKILL]] - cost = COST_MAP[call.data[ATTR_SKILL]] + try: + task_id = next( + task.id + for task in coordinator.data.tasks + if call.data[ATTR_TASK] in (str(task.id), task.alias, task.text) + ) + except StopIteration as e: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="task_not_found", + translation_placeholders={"task": f"'{call.data[ATTR_TASK]}'"}, + ) from e - try: - task_id = next( - task.id - for task in coordinator.data.tasks - if call.data[ATTR_TASK] in (str(task.id), task.alias, task.text) - ) - except StopIteration as e: + try: + response = await coordinator.habitica.cast_skill(skill, task_id) + except TooManyRequestsError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, + ) from e + except NotAuthorizedError as e: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="not_enough_mana", + translation_placeholders={ + "cost": cost, + "mana": f"{int(coordinator.data.user.stats.mp or 0)} MP", + }, + ) from e + except NotFoundError as e: + # could also be task not found, but the task is looked up + # before the request, so most likely wrong skill selected + # or the skill hasn't been unlocked yet. + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="skill_not_found", + translation_placeholders={"skill": call.data[ATTR_SKILL]}, + ) from e + except HabiticaException as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e.error.message)}, + ) from e + except ClientError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e)}, + ) from e + else: + await coordinator.async_request_refresh() + return asdict(response.data) + + +async def _manage_quests(call: ServiceCall) -> ServiceResponse: + """Accept, reject, start, leave or cancel quests.""" + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data + + FUNC_MAP = { + SERVICE_ABORT_QUEST: coordinator.habitica.abort_quest, + SERVICE_ACCEPT_QUEST: coordinator.habitica.accept_quest, + SERVICE_CANCEL_QUEST: coordinator.habitica.cancel_quest, + SERVICE_LEAVE_QUEST: coordinator.habitica.leave_quest, + SERVICE_REJECT_QUEST: coordinator.habitica.reject_quest, + SERVICE_START_QUEST: coordinator.habitica.start_quest, + } + + func = FUNC_MAP[call.service] + + try: + response = await func() + except TooManyRequestsError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, + ) from e + except NotAuthorizedError as e: + raise ServiceValidationError( + translation_domain=DOMAIN, translation_key="quest_action_unallowed" + ) from e + except NotFoundError as e: + raise ServiceValidationError( + translation_domain=DOMAIN, translation_key="quest_not_found" + ) from e + except HabiticaException as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e.error.message)}, + ) from e + except ClientError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e)}, + ) from e + else: + return asdict(response.data) + + +async def _score_task(call: ServiceCall) -> ServiceResponse: + """Score a task action.""" + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data + + direction = ( + Direction.DOWN if call.data.get(ATTR_DIRECTION) == "down" else Direction.UP + ) + try: + task_id, task_value = next( + (task.id, task.value) + for task in coordinator.data.tasks + if call.data[ATTR_TASK] in (str(task.id), task.alias, task.text) + ) + except StopIteration as e: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="task_not_found", + translation_placeholders={"task": f"'{call.data[ATTR_TASK]}'"}, + ) from e + + if TYPE_CHECKING: + assert task_id + try: + response = await coordinator.habitica.update_score(task_id, direction) + except TooManyRequestsError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, + ) from e + except NotAuthorizedError as e: + if task_value is not None: raise ServiceValidationError( translation_domain=DOMAIN, - translation_key="task_not_found", - translation_placeholders={"task": f"'{call.data[ATTR_TASK]}'"}, - ) from e - - try: - response = await coordinator.habitica.cast_skill(skill, task_id) - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except NotAuthorizedError as e: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="not_enough_mana", + translation_key="not_enough_gold", translation_placeholders={ - "cost": cost, - "mana": f"{int(coordinator.data.user.stats.mp or 0)} MP", + "gold": f"{(coordinator.data.user.stats.gp or 0):.2f} GP", + "cost": f"{task_value:.2f} GP", }, ) from e + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": e.error.message}, + ) from e + except HabiticaException as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e.error.message)}, + ) from e + except ClientError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e)}, + ) from e + else: + await coordinator.async_request_refresh() + return asdict(response.data) + + +async def _transformation(call: ServiceCall) -> ServiceResponse: + """User a transformation item on a player character.""" + + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data + + item = ITEMID_MAP[call.data[ATTR_ITEM]] + # check if target is self + if call.data[ATTR_TARGET] in ( + str(coordinator.data.user.id), + coordinator.data.user.profile.name, + coordinator.data.user.auth.local.username, + ): + target_id = coordinator.data.user.id + else: + # check if target is a party member + try: + party = await coordinator.habitica.get_group_members(public_fields=True) except NotFoundError as e: - # could also be task not found, but the task is looked up - # before the request, so most likely wrong skill selected - # or the skill hasn't been unlocked yet. raise ServiceValidationError( translation_domain=DOMAIN, - translation_key="skill_not_found", - translation_placeholders={"skill": call.data[ATTR_SKILL]}, + translation_key="party_not_found", ) from e except HabiticaException as e: raise HomeAssistantError( @@ -313,86 +460,125 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 translation_key="service_call_exception", translation_placeholders={"reason": str(e)}, ) from e - else: - await coordinator.async_request_refresh() - return asdict(response.data) + try: + target_id = next( + member.id + for member in party.data + if member.id + and call.data[ATTR_TARGET].lower() + in ( + str(member.id), + str(member.auth.local.username).lower(), + str(member.profile.name).lower(), + ) + ) + except StopIteration as e: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="target_not_found", + translation_placeholders={"target": f"'{call.data[ATTR_TARGET]}'"}, + ) from e + try: + response = await coordinator.habitica.cast_skill(item, target_id) + except TooManyRequestsError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, + ) from e + except NotAuthorizedError as e: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="item_not_found", + translation_placeholders={"item": call.data[ATTR_ITEM]}, + ) from e + except HabiticaException as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e.error.message)}, + ) from e + except ClientError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e)}, + ) from e + else: + return asdict(response.data) - async def manage_quests(call: ServiceCall) -> ServiceResponse: - """Accept, reject, start, leave or cancel quests.""" - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data - FUNC_MAP = { - SERVICE_ABORT_QUEST: coordinator.habitica.abort_quest, - SERVICE_ACCEPT_QUEST: coordinator.habitica.accept_quest, - SERVICE_CANCEL_QUEST: coordinator.habitica.cancel_quest, - SERVICE_LEAVE_QUEST: coordinator.habitica.leave_quest, - SERVICE_REJECT_QUEST: coordinator.habitica.reject_quest, - SERVICE_START_QUEST: coordinator.habitica.start_quest, +async def _get_tasks(call: ServiceCall) -> ServiceResponse: + """Get tasks action.""" + + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data + response: list[TaskData] = coordinator.data.tasks + + if types := {TaskType[x] for x in call.data.get(ATTR_TYPE, [])}: + response = [task for task in response if task.Type in types] + + if priority := {TaskPriority[x] for x in call.data.get(ATTR_PRIORITY, [])}: + response = [task for task in response if task.priority in priority] + + if tasks := call.data.get(ATTR_TASK): + response = [ + task + for task in response + if str(task.id) in tasks or task.alias in tasks or task.text in tasks + ] + + if tags := call.data.get(ATTR_TAG): + tag_ids = { + tag.id + for tag in coordinator.data.user.tags + if (tag.name and tag.name.lower()) + in (tag.lower() for tag in tags) # Case-insensitive matching + and tag.id } - func = FUNC_MAP[call.service] + response = [ + task + for task in response + if any(tag_id in task.tags for tag_id in tag_ids if task.tags) + ] + if keyword := call.data.get(ATTR_KEYWORD): + keyword = keyword.lower() + response = [ + task + for task in response + if (task.text and keyword in task.text.lower()) + or (task.notes and keyword in task.notes.lower()) + or any(keyword in item.text.lower() for item in task.checklist) + ] + result: dict[str, Any] = { + "tasks": [task.to_dict(omit_none=False) for task in response] + } + return result + + +async def _create_or_update_task(call: ServiceCall) -> ServiceResponse: # noqa: C901 + """Create or update task action.""" + entry = get_config_entry(call.hass, call.data[ATTR_CONFIG_ENTRY]) + coordinator = entry.runtime_data + await coordinator.async_refresh() + is_update = call.service in ( + SERVICE_UPDATE_HABIT, + SERVICE_UPDATE_REWARD, + SERVICE_UPDATE_TODO, + SERVICE_UPDATE_DAILY, + ) + task_type = SERVICE_TASK_TYPE_MAP[call.service] + current_task = None + + if is_update: try: - response = await func() - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except NotAuthorizedError as e: - raise ServiceValidationError( - translation_domain=DOMAIN, translation_key="quest_action_unallowed" - ) from e - except NotFoundError as e: - raise ServiceValidationError( - translation_domain=DOMAIN, translation_key="quest_not_found" - ) from e - except HabiticaException as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e.error.message)}, - ) from e - except ClientError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e)}, - ) from e - else: - return asdict(response.data) - - for service in ( - SERVICE_ABORT_QUEST, - SERVICE_ACCEPT_QUEST, - SERVICE_CANCEL_QUEST, - SERVICE_LEAVE_QUEST, - SERVICE_REJECT_QUEST, - SERVICE_START_QUEST, - ): - hass.services.async_register( - DOMAIN, - service, - manage_quests, - schema=SERVICE_MANAGE_QUEST_SCHEMA, - supports_response=SupportsResponse.ONLY, - ) - - async def score_task(call: ServiceCall) -> ServiceResponse: - """Score a task action.""" - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data - - direction = ( - Direction.DOWN if call.data.get(ATTR_DIRECTION) == "down" else Direction.UP - ) - try: - task_id, task_value = next( - (task.id, task.value) + current_task = next( + task for task in coordinator.data.tasks if call.data[ATTR_TASK] in (str(task.id), task.alias, task.text) + and task.Type is task_type ) except StopIteration as e: raise ServiceValidationError( @@ -401,69 +587,48 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 translation_placeholders={"task": f"'{call.data[ATTR_TASK]}'"}, ) from e - if TYPE_CHECKING: - assert task_id - try: - response = await coordinator.habitica.update_score(task_id, direction) - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except NotAuthorizedError as e: - if task_value is not None: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="not_enough_gold", - translation_placeholders={ - "gold": f"{(coordinator.data.user.stats.gp or 0):.2f} GP", - "cost": f"{task_value:.2f} GP", - }, - ) from e - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": e.error.message}, - ) from e - except HabiticaException as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e.error.message)}, - ) from e - except ClientError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e)}, - ) from e - else: - await coordinator.async_request_refresh() - return asdict(response.data) + data = Task() - async def transformation(call: ServiceCall) -> ServiceResponse: - """User a transformation item on a player character.""" + if not is_update: + data["type"] = task_type - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data + if (text := call.data.get(ATTR_RENAME)) or (text := call.data.get(ATTR_NAME)): + data["text"] = text + + if (notes := call.data.get(ATTR_NOTES)) is not None: + data["notes"] = notes + + tags = cast(list[str], call.data.get(ATTR_TAG)) + remove_tags = cast(list[str], call.data.get(ATTR_REMOVE_TAG)) + + if tags or remove_tags: + update_tags = set(current_task.tags) if current_task else set() + user_tags = { + tag.name.lower(): tag.id + for tag in coordinator.data.user.tags + if tag.id and tag.name + } + + if tags: + # Creates new tag if it doesn't exist + async def create_tag(tag_name: str) -> UUID: + tag_id = (await coordinator.habitica.create_tag(tag_name)).data.id + if TYPE_CHECKING: + assert tag_id + return tag_id - item = ITEMID_MAP[call.data[ATTR_ITEM]] - # check if target is self - if call.data[ATTR_TARGET] in ( - str(coordinator.data.user.id), - coordinator.data.user.profile.name, - coordinator.data.user.auth.local.username, - ): - target_id = coordinator.data.user.id - else: - # check if target is a party member try: - party = await coordinator.habitica.get_group_members(public_fields=True) - except NotFoundError as e: - raise ServiceValidationError( + update_tags.update( + { + user_tags.get(tag_name.lower()) or (await create_tag(tag_name)) + for tag_name in tags + } + ) + except TooManyRequestsError as e: + raise HomeAssistantError( translation_domain=DOMAIN, - translation_key="party_not_found", + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, ) from e except HabiticaException as e: raise HomeAssistantError( @@ -477,378 +642,218 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 translation_key="service_call_exception", translation_placeholders={"reason": str(e)}, ) from e - try: - target_id = next( - member.id - for member in party.data - if member.id - and call.data[ATTR_TARGET].lower() - in ( - str(member.id), - str(member.auth.local.username).lower(), - str(member.profile.name).lower(), - ) + + if remove_tags: + update_tags.difference_update( + { + user_tags[tag_name.lower()] + for tag_name in remove_tags + if tag_name.lower() in user_tags + } + ) + + data["tags"] = list(update_tags) + + if (alias := call.data.get(ATTR_ALIAS)) is not None: + data["alias"] = alias + + if (cost := call.data.get(ATTR_COST)) is not None: + data["value"] = cost + + if priority := call.data.get(ATTR_PRIORITY): + data["priority"] = TaskPriority[priority] + + if frequency := call.data.get(ATTR_FREQUENCY): + data["frequency"] = frequency + else: + frequency = current_task.frequency if current_task else Frequency.WEEKLY + + if up_down := call.data.get(ATTR_UP_DOWN): + data["up"] = "up" in up_down + data["down"] = "down" in up_down + + if counter_up := call.data.get(ATTR_COUNTER_UP): + data["counterUp"] = counter_up + + if counter_down := call.data.get(ATTR_COUNTER_DOWN): + data["counterDown"] = counter_down + + if due_date := call.data.get(ATTR_DATE): + data["date"] = datetime.combine(due_date, time()) + + if call.data.get(ATTR_CLEAR_DATE): + data["date"] = None + + checklist = current_task.checklist if current_task else [] + + if add_checklist_item := call.data.get(ATTR_ADD_CHECKLIST_ITEM): + checklist.extend( + Checklist(completed=False, id=uuid4(), text=item) + for item in add_checklist_item + if not any(i.text == item for i in checklist) + ) + if remove_checklist_item := call.data.get(ATTR_REMOVE_CHECKLIST_ITEM): + checklist = [ + item for item in checklist if item.text not in remove_checklist_item + ] + + if score_checklist_item := call.data.get(ATTR_SCORE_CHECKLIST_ITEM): + for item in checklist: + if item.text in score_checklist_item: + item.completed = True + + if unscore_checklist_item := call.data.get(ATTR_UNSCORE_CHECKLIST_ITEM): + for item in checklist: + if item.text in unscore_checklist_item: + item.completed = False + if ( + add_checklist_item + or remove_checklist_item + or score_checklist_item + or unscore_checklist_item + ): + data["checklist"] = checklist + + reminders = current_task.reminders if current_task else [] + + if add_reminders := call.data.get(ATTR_REMINDER): + if task_type is TaskType.TODO: + existing_reminder_datetimes = { + r.time.replace(tzinfo=None) for r in reminders + } + + reminders.extend( + Reminders(id=uuid4(), time=r) + for r in add_reminders + if r not in existing_reminder_datetimes + ) + if task_type is TaskType.DAILY: + existing_reminder_times = { + r.time.time().replace(microsecond=0, second=0) for r in reminders + } + + reminders.extend( + Reminders( + id=uuid4(), + time=datetime.combine(date.today(), r, tzinfo=UTC), ) - except StopIteration as e: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="target_not_found", - translation_placeholders={"target": f"'{call.data[ATTR_TARGET]}'"}, - ) from e - try: - response = await coordinator.habitica.cast_skill(item, target_id) - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except NotAuthorizedError as e: + for r in add_reminders + if r not in existing_reminder_times + ) + + if remove_reminder := call.data.get(ATTR_REMOVE_REMINDER): + if task_type is TaskType.TODO: + reminders = list( + filter( + lambda r: r.time.replace(tzinfo=None) not in remove_reminder, + reminders, + ) + ) + if task_type is TaskType.DAILY: + reminders = list( + filter( + lambda r: r.time.time().replace(second=0, microsecond=0) + not in remove_reminder, + reminders, + ) + ) + + if clear_reminders := call.data.get(ATTR_CLEAR_REMINDER): + reminders = [] + + if add_reminders or remove_reminder or clear_reminders: + data["reminders"] = reminders + + if start_date := call.data.get(ATTR_START_DATE): + data["startDate"] = datetime.combine(start_date, time()) + else: + start_date = ( + current_task.startDate + if current_task and current_task.startDate + else dt_util.start_of_local_day() + ) + if repeat := call.data.get(ATTR_REPEAT): + if frequency is Frequency.WEEKLY: + data["repeat"] = Repeat(**{d: d in repeat for d in WEEK_DAYS}) + else: raise ServiceValidationError( translation_domain=DOMAIN, - translation_key="item_not_found", - translation_placeholders={"item": call.data[ATTR_ITEM]}, - ) from e - except HabiticaException as e: - raise HomeAssistantError( + translation_key="frequency_not_weekly", + ) + if repeat_monthly := call.data.get(ATTR_REPEAT_MONTHLY): + if frequency is not Frequency.MONTHLY: + raise ServiceValidationError( translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e.error.message)}, - ) from e - except ClientError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e)}, - ) from e + translation_key="frequency_not_monthly", + ) + + if repeat_monthly == "day_of_week": + weekday = start_date.weekday() + data["weeksOfMonth"] = [(start_date.day - 1) // 7] + data["repeat"] = Repeat( + **{day: i == weekday for i, day in enumerate(WEEK_DAYS)} + ) + data["daysOfMonth"] = [] + else: - return asdict(response.data) + data["daysOfMonth"] = [start_date.day] + data["weeksOfMonth"] = [] - async def get_tasks(call: ServiceCall) -> ServiceResponse: - """Get tasks action.""" + if interval := call.data.get(ATTR_INTERVAL): + data["everyX"] = interval - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data - response: list[TaskData] = coordinator.data.tasks - - if types := {TaskType[x] for x in call.data.get(ATTR_TYPE, [])}: - response = [task for task in response if task.Type in types] - - if priority := {TaskPriority[x] for x in call.data.get(ATTR_PRIORITY, [])}: - response = [task for task in response if task.priority in priority] - - if tasks := call.data.get(ATTR_TASK): - response = [ - task - for task in response - if str(task.id) in tasks or task.alias in tasks or task.text in tasks - ] - - if tags := call.data.get(ATTR_TAG): - tag_ids = { - tag.id - for tag in coordinator.data.user.tags - if (tag.name and tag.name.lower()) - in (tag.lower() for tag in tags) # Case-insensitive matching - and tag.id - } - - response = [ - task - for task in response - if any(tag_id in task.tags for tag_id in tag_ids if task.tags) - ] - if keyword := call.data.get(ATTR_KEYWORD): - keyword = keyword.lower() - response = [ - task - for task in response - if (task.text and keyword in task.text.lower()) - or (task.notes and keyword in task.notes.lower()) - or any(keyword in item.text.lower() for item in task.checklist) - ] - result: dict[str, Any] = { - "tasks": [task.to_dict(omit_none=False) for task in response] - } - - return result - - async def create_or_update_task(call: ServiceCall) -> ServiceResponse: # noqa: C901 - """Create or update task action.""" - entry = get_config_entry(hass, call.data[ATTR_CONFIG_ENTRY]) - coordinator = entry.runtime_data - await coordinator.async_refresh() - is_update = call.service in ( - SERVICE_UPDATE_HABIT, - SERVICE_UPDATE_REWARD, - SERVICE_UPDATE_TODO, - SERVICE_UPDATE_DAILY, - ) - task_type = SERVICE_TASK_TYPE_MAP[call.service] - current_task = None + if streak := call.data.get(ATTR_STREAK): + data["streak"] = streak + try: if is_update: - try: - current_task = next( - task - for task in coordinator.data.tasks - if call.data[ATTR_TASK] in (str(task.id), task.alias, task.text) - and task.Type is task_type - ) - except StopIteration as e: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="task_not_found", - translation_placeholders={"task": f"'{call.data[ATTR_TASK]}'"}, - ) from e - - data = Task() - - if not is_update: - data["type"] = task_type - - if (text := call.data.get(ATTR_RENAME)) or (text := call.data.get(ATTR_NAME)): - data["text"] = text - - if (notes := call.data.get(ATTR_NOTES)) is not None: - data["notes"] = notes - - tags = cast(list[str], call.data.get(ATTR_TAG)) - remove_tags = cast(list[str], call.data.get(ATTR_REMOVE_TAG)) - - if tags or remove_tags: - update_tags = set(current_task.tags) if current_task else set() - user_tags = { - tag.name.lower(): tag.id - for tag in coordinator.data.user.tags - if tag.id and tag.name - } - - if tags: - # Creates new tag if it doesn't exist - async def create_tag(tag_name: str) -> UUID: - tag_id = (await coordinator.habitica.create_tag(tag_name)).data.id - if TYPE_CHECKING: - assert tag_id - return tag_id - - try: - update_tags.update( - { - user_tags.get(tag_name.lower()) - or (await create_tag(tag_name)) - for tag_name in tags - } - ) - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except HabiticaException as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e.error.message)}, - ) from e - except ClientError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e)}, - ) from e - - if remove_tags: - update_tags.difference_update( - { - user_tags[tag_name.lower()] - for tag_name in remove_tags - if tag_name.lower() in user_tags - } - ) - - data["tags"] = list(update_tags) - - if (alias := call.data.get(ATTR_ALIAS)) is not None: - data["alias"] = alias - - if (cost := call.data.get(ATTR_COST)) is not None: - data["value"] = cost - - if priority := call.data.get(ATTR_PRIORITY): - data["priority"] = TaskPriority[priority] - - if frequency := call.data.get(ATTR_FREQUENCY): - data["frequency"] = frequency + if TYPE_CHECKING: + assert current_task + assert current_task.id + response = await coordinator.habitica.update_task(current_task.id, data) else: - frequency = current_task.frequency if current_task else Frequency.WEEKLY + response = await coordinator.habitica.create_task(data) + except TooManyRequestsError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="setup_rate_limit_exception", + translation_placeholders={"retry_after": str(e.retry_after)}, + ) from e + except HabiticaException as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e.error.message)}, + ) from e + except ClientError as e: + raise HomeAssistantError( + translation_domain=DOMAIN, + translation_key="service_call_exception", + translation_placeholders={"reason": str(e)}, + ) from e + else: + return response.data.to_dict(omit_none=True) - if up_down := call.data.get(ATTR_UP_DOWN): - data["up"] = "up" in up_down - data["down"] = "down" in up_down - if counter_up := call.data.get(ATTR_COUNTER_UP): - data["counterUp"] = counter_up +@callback +def async_setup_services(hass: HomeAssistant) -> None: + """Set up services for Habitica integration.""" - if counter_down := call.data.get(ATTR_COUNTER_DOWN): - data["counterDown"] = counter_down - - if due_date := call.data.get(ATTR_DATE): - data["date"] = datetime.combine(due_date, time()) - - if call.data.get(ATTR_CLEAR_DATE): - data["date"] = None - - checklist = current_task.checklist if current_task else [] - - if add_checklist_item := call.data.get(ATTR_ADD_CHECKLIST_ITEM): - checklist.extend( - Checklist(completed=False, id=uuid4(), text=item) - for item in add_checklist_item - if not any(i.text == item for i in checklist) - ) - if remove_checklist_item := call.data.get(ATTR_REMOVE_CHECKLIST_ITEM): - checklist = [ - item for item in checklist if item.text not in remove_checklist_item - ] - - if score_checklist_item := call.data.get(ATTR_SCORE_CHECKLIST_ITEM): - for item in checklist: - if item.text in score_checklist_item: - item.completed = True - - if unscore_checklist_item := call.data.get(ATTR_UNSCORE_CHECKLIST_ITEM): - for item in checklist: - if item.text in unscore_checklist_item: - item.completed = False - if ( - add_checklist_item - or remove_checklist_item - or score_checklist_item - or unscore_checklist_item - ): - data["checklist"] = checklist - - reminders = current_task.reminders if current_task else [] - - if add_reminders := call.data.get(ATTR_REMINDER): - if task_type is TaskType.TODO: - existing_reminder_datetimes = { - r.time.replace(tzinfo=None) for r in reminders - } - - reminders.extend( - Reminders(id=uuid4(), time=r) - for r in add_reminders - if r not in existing_reminder_datetimes - ) - if task_type is TaskType.DAILY: - existing_reminder_times = { - r.time.time().replace(microsecond=0, second=0) for r in reminders - } - - reminders.extend( - Reminders( - id=uuid4(), - time=datetime.combine(date.today(), r, tzinfo=UTC), - ) - for r in add_reminders - if r not in existing_reminder_times - ) - - if remove_reminder := call.data.get(ATTR_REMOVE_REMINDER): - if task_type is TaskType.TODO: - reminders = list( - filter( - lambda r: r.time.replace(tzinfo=None) not in remove_reminder, - reminders, - ) - ) - if task_type is TaskType.DAILY: - reminders = list( - filter( - lambda r: r.time.time().replace(second=0, microsecond=0) - not in remove_reminder, - reminders, - ) - ) - - if clear_reminders := call.data.get(ATTR_CLEAR_REMINDER): - reminders = [] - - if add_reminders or remove_reminder or clear_reminders: - data["reminders"] = reminders - - if start_date := call.data.get(ATTR_START_DATE): - data["startDate"] = datetime.combine(start_date, time()) - else: - start_date = ( - current_task.startDate - if current_task and current_task.startDate - else dt_util.start_of_local_day() - ) - if repeat := call.data.get(ATTR_REPEAT): - if frequency is Frequency.WEEKLY: - data["repeat"] = Repeat(**{d: d in repeat for d in WEEK_DAYS}) - else: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="frequency_not_weekly", - ) - if repeat_monthly := call.data.get(ATTR_REPEAT_MONTHLY): - if frequency is not Frequency.MONTHLY: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="frequency_not_monthly", - ) - - if repeat_monthly == "day_of_week": - weekday = start_date.weekday() - data["weeksOfMonth"] = [(start_date.day - 1) // 7] - data["repeat"] = Repeat( - **{day: i == weekday for i, day in enumerate(WEEK_DAYS)} - ) - data["daysOfMonth"] = [] - - else: - data["daysOfMonth"] = [start_date.day] - data["weeksOfMonth"] = [] - - if interval := call.data.get(ATTR_INTERVAL): - data["everyX"] = interval - - if streak := call.data.get(ATTR_STREAK): - data["streak"] = streak - - try: - if is_update: - if TYPE_CHECKING: - assert current_task - assert current_task.id - response = await coordinator.habitica.update_task(current_task.id, data) - else: - response = await coordinator.habitica.create_task(data) - except TooManyRequestsError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="setup_rate_limit_exception", - translation_placeholders={"retry_after": str(e.retry_after)}, - ) from e - except HabiticaException as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e.error.message)}, - ) from e - except ClientError as e: - raise HomeAssistantError( - translation_domain=DOMAIN, - translation_key="service_call_exception", - translation_placeholders={"reason": str(e)}, - ) from e - else: - return response.data.to_dict(omit_none=True) + for service in ( + SERVICE_ABORT_QUEST, + SERVICE_ACCEPT_QUEST, + SERVICE_CANCEL_QUEST, + SERVICE_LEAVE_QUEST, + SERVICE_REJECT_QUEST, + SERVICE_START_QUEST, + ): + hass.services.async_register( + DOMAIN, + service, + _manage_quests, + schema=SERVICE_MANAGE_QUEST_SCHEMA, + supports_response=SupportsResponse.ONLY, + ) for service in ( SERVICE_UPDATE_DAILY, @@ -859,7 +864,7 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 hass.services.async_register( DOMAIN, service, - create_or_update_task, + _create_or_update_task, schema=SERVICE_UPDATE_TASK_SCHEMA, supports_response=SupportsResponse.ONLY, ) @@ -872,7 +877,7 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 hass.services.async_register( DOMAIN, service, - create_or_update_task, + _create_or_update_task, schema=SERVICE_CREATE_TASK_SCHEMA, supports_response=SupportsResponse.ONLY, ) @@ -880,7 +885,7 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 hass.services.async_register( DOMAIN, SERVICE_CAST_SKILL, - cast_skill, + _cast_skill, schema=SERVICE_CAST_SKILL_SCHEMA, supports_response=SupportsResponse.ONLY, ) @@ -888,14 +893,14 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 hass.services.async_register( DOMAIN, SERVICE_SCORE_HABIT, - score_task, + _score_task, schema=SERVICE_SCORE_TASK_SCHEMA, supports_response=SupportsResponse.ONLY, ) hass.services.async_register( DOMAIN, SERVICE_SCORE_REWARD, - score_task, + _score_task, schema=SERVICE_SCORE_TASK_SCHEMA, supports_response=SupportsResponse.ONLY, ) @@ -903,14 +908,14 @@ def async_setup_services(hass: HomeAssistant) -> None: # noqa: C901 hass.services.async_register( DOMAIN, SERVICE_TRANSFORMATION, - transformation, + _transformation, schema=SERVICE_TRANSFORMATION_SCHEMA, supports_response=SupportsResponse.ONLY, ) hass.services.async_register( DOMAIN, SERVICE_GET_TASKS, - get_tasks, + _get_tasks, schema=SERVICE_GET_TASKS_SCHEMA, supports_response=SupportsResponse.ONLY, )