Files
core/homeassistant/components/xbox/media_source.py

661 lines
24 KiB
Python

"""Xbox Media Source Implementation."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from httpx import HTTPStatusError, RequestError, TimeoutException
from pythonxbox.api.provider.titlehub.models import Image, Title, TitleFields
from homeassistant.components.media_player import BrowseError, MediaClass
from homeassistant.components.media_source import (
BrowseMediaSource,
MediaSource,
MediaSourceItem,
PlayMedia,
Unresolvable,
)
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from .binary_sensor import profile_pic
from .const import DOMAIN
from .coordinator import XboxConfigEntry
_LOGGER = logging.getLogger(__name__)
ATTR_GAMECLIPS = "gameclips"
ATTR_SCREENSHOTS = "screenshots"
ATTR_GAME_MEDIA = "game_media"
ATTR_COMMUNITY_GAMECLIPS = "community_gameclips"
ATTR_COMMUNITY_SCREENSHOTS = "community_screenshots"
MAP_TITLE = {
ATTR_GAMECLIPS: "Gameclips",
ATTR_SCREENSHOTS: "Screenshots",
ATTR_GAME_MEDIA: "Game media",
ATTR_COMMUNITY_GAMECLIPS: "Community gameclips",
ATTR_COMMUNITY_SCREENSHOTS: "Community screenshots",
}
MIME_TYPE_MAP = {
ATTR_GAMECLIPS: "video/mp4",
ATTR_COMMUNITY_GAMECLIPS: "video/mp4",
ATTR_SCREENSHOTS: "image/png",
ATTR_COMMUNITY_SCREENSHOTS: "image/png",
}
MEDIA_CLASS_MAP = {
ATTR_GAMECLIPS: MediaClass.VIDEO,
ATTR_COMMUNITY_GAMECLIPS: MediaClass.VIDEO,
ATTR_SCREENSHOTS: MediaClass.IMAGE,
ATTR_COMMUNITY_SCREENSHOTS: MediaClass.IMAGE,
ATTR_GAME_MEDIA: MediaClass.IMAGE,
}
SEPARATOR = "/"
async def async_get_media_source(hass: HomeAssistant) -> XboxSource:
"""Set up Xbox media source."""
return XboxSource(hass)
class XboxMediaSourceIdentifier:
"""Media item identifier."""
xuid = title_id = media_type = media_id = ""
def __init__(self, item: MediaSourceItem) -> None:
"""Initialize identifier."""
if item.identifier is not None:
self.xuid, _, self.title_id = (item.identifier).partition(SEPARATOR)
self.title_id, _, self.media_type = (self.title_id).partition(SEPARATOR)
self.media_type, _, self.media_id = (self.media_type).partition(SEPARATOR)
def __str__(self) -> str:
"""Build identifier."""
return SEPARATOR.join(
[i for i in (self.xuid, self.title_id, self.media_type, self.media_id) if i]
)
class XboxSource(MediaSource):
"""Provide Xbox screenshots and gameclips as media sources."""
name: str = "Xbox Game Media"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize Xbox source."""
super().__init__(DOMAIN)
self.hass = hass
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media to a url."""
identifier = XboxMediaSourceIdentifier(item)
if not (entries := self.hass.config_entries.async_loaded_entries(DOMAIN)):
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="xbox_not_configured",
)
try:
entry: XboxConfigEntry = next(
e for e in entries if e.unique_id == identifier.xuid
)
except StopIteration as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="account_not_configured",
) from e
client = entry.runtime_data.status.client
if identifier.media_type in (ATTR_GAMECLIPS, ATTR_COMMUNITY_GAMECLIPS):
try:
if identifier.media_type == ATTR_GAMECLIPS:
gameclips_response = (
await client.gameclips.get_recent_clips_by_xuid(
identifier.xuid, identifier.title_id, max_items=999
)
)
else:
gameclips_response = (
await client.gameclips.get_recent_community_clips_by_title_id(
identifier.title_id
)
)
except TimeoutException as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
gameclips = gameclips_response.game_clips
try:
clip = next(
g for g in gameclips if g.game_clip_id == identifier.media_id
)
except StopIteration as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="media_not_found",
) from e
return PlayMedia(clip.game_clip_uris[0].uri, MIME_TYPE_MAP[ATTR_GAMECLIPS])
if identifier.media_type in (ATTR_SCREENSHOTS, ATTR_COMMUNITY_SCREENSHOTS):
try:
if identifier.media_type == ATTR_SCREENSHOTS:
screenshot_response = (
await client.screenshots.get_recent_screenshots_by_xuid(
identifier.xuid, identifier.title_id, max_items=999
)
)
else:
screenshot_response = await client.screenshots.get_recent_community_screenshots_by_title_id(
identifier.title_id
)
except TimeoutException as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
screenshots = screenshot_response.screenshots
try:
img = next(
s for s in screenshots if s.screenshot_id == identifier.media_id
)
except StopIteration as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="media_not_found",
) from e
return PlayMedia(
img.screenshot_uris[0].uri, MIME_TYPE_MAP[identifier.media_type]
)
if identifier.media_type == ATTR_GAME_MEDIA:
try:
images = (
(await client.titlehub.get_title_info(identifier.title_id))
.titles[0]
.images
)
except TimeoutException as e:
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
if images is not None:
try:
return PlayMedia(
images[int(identifier.media_id)].url,
MIME_TYPE_MAP[ATTR_SCREENSHOTS],
)
except (ValueError, IndexError):
pass
raise Unresolvable(
translation_domain=DOMAIN,
translation_key="media_not_found",
)
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
if not (entries := self.hass.config_entries.async_loaded_entries(DOMAIN)):
raise BrowseError(
translation_domain=DOMAIN,
translation_key="xbox_not_configured",
)
# if there is only one entry we can directly jump to it
if not item.identifier and len(entries) > 1:
return BrowseMediaSource(
domain=DOMAIN,
identifier=None,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.IMAGE,
title="Xbox Game Media",
can_play=False,
can_expand=True,
children=[*await self._build_accounts(entries)],
children_media_class=MediaClass.DIRECTORY,
)
identifier = XboxMediaSourceIdentifier(item)
if not identifier.xuid and len(entries) == 1:
if TYPE_CHECKING:
assert entries[0].unique_id
identifier.xuid = entries[0].unique_id
try:
entry: XboxConfigEntry = next(
e for e in entries if e.unique_id == identifier.xuid
)
except StopIteration as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="account_not_configured",
) from e
if not identifier.title_id:
return await self._build_game_library(entry)
if not identifier.media_type:
return await self._build_game_title(entry, identifier)
return await self._build_game_media(entry, identifier)
async def _build_accounts(
self, entries: list[XboxConfigEntry]
) -> list[BrowseMediaSource]:
"""List Xbox accounts."""
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=entry.unique_id,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.DIRECTORY,
title=entry.title,
can_play=False,
can_expand=True,
thumbnail=gamerpic(entry),
)
for entry in entries
]
async def _build_game_library(self, entry: XboxConfigEntry) -> BrowseMediaSource:
"""Display played games."""
return BrowseMediaSource(
domain=DOMAIN,
identifier=entry.unique_id,
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.DIRECTORY,
title=f"Xbox / {entry.title}",
can_play=False,
can_expand=True,
children=[*await self._build_games(entry)],
children_media_class=MediaClass.GAME,
)
async def _build_games(self, entry: XboxConfigEntry) -> list[BrowseMediaSource]:
"""List Xbox games for the selected account."""
client = entry.runtime_data.status.client
if TYPE_CHECKING:
assert entry.unique_id
fields = [
TitleFields.ACHIEVEMENT,
TitleFields.STATS,
TitleFields.IMAGE,
]
try:
games = await client.titlehub.get_title_history(
entry.unique_id, fields, max_items=999
)
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{entry.unique_id}/{game.title_id}",
media_class=MediaClass.GAME,
media_content_type=MediaClass.GAME,
title=game.name,
can_play=False,
can_expand=True,
children_media_class=MediaClass.DIRECTORY,
thumbnail=game_thumbnail(game.images or []),
)
for game in games.titles
if game.achievement and game.achievement.source_version != 0
]
async def _build_game_title(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> BrowseMediaSource:
"""Display game title."""
client = entry.runtime_data.status.client
try:
game = (await client.titlehub.get_title_info(identifier.title_id)).titles[0]
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return BrowseMediaSource(
domain=DOMAIN,
identifier=str(identifier),
media_class=MediaClass.GAME,
media_content_type=MediaClass.GAME,
title=f"Xbox / {entry.title} / {game.name}",
can_play=False,
can_expand=True,
children=[*self._build_categories(identifier)],
children_media_class=MediaClass.DIRECTORY,
)
def _build_categories(
self, identifier: XboxMediaSourceIdentifier
) -> list[BrowseMediaSource]:
"""List media categories."""
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{media_type}",
media_class=MediaClass.DIRECTORY,
media_content_type=MediaClass.DIRECTORY,
title=MAP_TITLE[media_type],
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_MAP[media_type],
)
for media_type in (
ATTR_GAMECLIPS,
ATTR_SCREENSHOTS,
ATTR_COMMUNITY_GAMECLIPS,
ATTR_COMMUNITY_SCREENSHOTS,
ATTR_GAME_MEDIA,
)
]
async def _build_game_media(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> BrowseMediaSource:
"""List game media."""
client = entry.runtime_data.status.client
try:
game = (await client.titlehub.get_title_info(identifier.title_id)).titles[0]
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return BrowseMediaSource(
domain=DOMAIN,
identifier=str(identifier),
media_class=MEDIA_CLASS_MAP[identifier.media_type],
media_content_type=MediaClass.DIRECTORY,
title=f"Xbox / {entry.title} / {game.name} / {MAP_TITLE[identifier.media_type]}",
can_play=False,
can_expand=True,
children=[
*await self._build_media_items_gameclips(entry, identifier)
+ await self._build_media_items_community_gameclips(entry, identifier)
+ await self._build_media_items_screenshots(entry, identifier)
+ await self._build_media_items_community_screenshots(entry, identifier)
+ self._build_media_items_promotional(identifier, game)
],
children_media_class=MEDIA_CLASS_MAP[identifier.media_type],
)
async def _build_media_items_gameclips(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> list[BrowseMediaSource]:
"""List media items."""
client = entry.runtime_data.status.client
if identifier.media_type != ATTR_GAMECLIPS:
return []
try:
gameclips = (
await client.gameclips.get_recent_clips_by_xuid(
identifier.xuid, identifier.title_id, max_items=999
)
).game_clips
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{gameclip.game_clip_id}",
media_class=MediaClass.VIDEO,
media_content_type=MediaClass.VIDEO,
title=(
f"{gameclip.user_caption}"
f"{' | ' if gameclip.user_caption else ''}"
f"{dt_util.get_age(gameclip.date_recorded)}"
),
can_play=True,
can_expand=False,
thumbnail=gameclip.thumbnails[0].uri,
)
for gameclip in gameclips
]
async def _build_media_items_community_gameclips(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> list[BrowseMediaSource]:
"""List media items."""
client = entry.runtime_data.status.client
if identifier.media_type != ATTR_COMMUNITY_GAMECLIPS:
return []
try:
gameclips = (
await client.gameclips.get_recent_community_clips_by_title_id(
identifier.title_id
)
).game_clips
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{gameclip.game_clip_id}",
media_class=MediaClass.VIDEO,
media_content_type=MediaClass.VIDEO,
title=(
f"{gameclip.user_caption}"
f"{' | ' if gameclip.user_caption else ''}"
f"{dt_util.get_age(gameclip.date_recorded)}"
),
can_play=True,
can_expand=False,
thumbnail=gameclip.thumbnails[0].uri,
)
for gameclip in gameclips
]
async def _build_media_items_screenshots(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> list[BrowseMediaSource]:
"""List media items."""
client = entry.runtime_data.status.client
if identifier.media_type != ATTR_SCREENSHOTS:
return []
try:
screenshots = (
await client.screenshots.get_recent_screenshots_by_xuid(
identifier.xuid, identifier.title_id, max_items=999
)
).screenshots
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{screenshot.screenshot_id}",
media_class=MediaClass.VIDEO,
media_content_type=MediaClass.VIDEO,
title=(
f"{screenshot.user_caption}"
f"{' | ' if screenshot.user_caption else ''}"
f"{dt_util.get_age(screenshot.date_taken)} | {screenshot.resolution_height}p"
),
can_play=True,
can_expand=False,
thumbnail=screenshot.thumbnails[0].uri,
)
for screenshot in screenshots
]
async def _build_media_items_community_screenshots(
self, entry: XboxConfigEntry, identifier: XboxMediaSourceIdentifier
) -> list[BrowseMediaSource]:
"""List media items."""
client = entry.runtime_data.status.client
if identifier.media_type != ATTR_COMMUNITY_SCREENSHOTS:
return []
try:
screenshots = (
await client.screenshots.get_recent_community_screenshots_by_title_id(
identifier.title_id
)
).screenshots
except TimeoutException as e:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="timeout_exception",
) from e
except (RequestError, HTTPStatusError) as e:
_LOGGER.debug("Xbox exception:", exc_info=True)
raise BrowseError(
translation_domain=DOMAIN,
translation_key="request_exception",
) from e
return [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{screenshot.screenshot_id}",
media_class=MediaClass.VIDEO,
media_content_type=MediaClass.VIDEO,
title=(
f"{screenshot.user_caption}"
f"{' | ' if screenshot.user_caption else ''}"
f"{dt_util.get_age(screenshot.date_taken)} | {screenshot.resolution_height}p"
),
can_play=True,
can_expand=False,
thumbnail=screenshot.thumbnails[0].uri,
)
for screenshot in screenshots
]
def _build_media_items_promotional(
self, identifier: XboxMediaSourceIdentifier, game: Title
) -> list[BrowseMediaSource]:
"""List promotional game media."""
if identifier.media_type != ATTR_GAME_MEDIA:
return []
return (
[
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{identifier}/{game.images.index(image)}",
media_class=MediaClass.VIDEO,
media_content_type=MediaClass.VIDEO,
title=image.type,
can_play=True,
can_expand=False,
thumbnail=image.url,
)
for image in game.images
]
if game.images
else []
)
def gamerpic(config_entry: XboxConfigEntry) -> str | None:
"""Return gamerpic."""
coordinator = config_entry.runtime_data.status
if TYPE_CHECKING:
assert config_entry.unique_id
person = coordinator.data.presence[coordinator.client.xuid]
return profile_pic(person)
def game_thumbnail(images: list[Image]) -> str | None:
"""Return the title image."""
for img_type in ("BrandedKeyArt", "Poster", "BoxArt"):
if match := next(
(i for i in images if i.type == img_type),
None,
):
return match.url
return None