mirror of
https://github.com/home-assistant/core.git
synced 2025-07-25 22:27:07 +00:00
Improve pandora media_player typing (#134447)
This commit is contained in:
parent
1d731875ae
commit
c345f2d548
@ -8,6 +8,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import signal
|
import signal
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
import pexpect
|
import pexpect
|
||||||
|
|
||||||
@ -26,7 +27,7 @@ from homeassistant.const import (
|
|||||||
SERVICE_VOLUME_DOWN,
|
SERVICE_VOLUME_DOWN,
|
||||||
SERVICE_VOLUME_UP,
|
SERVICE_VOLUME_UP,
|
||||||
)
|
)
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import Event, HomeAssistant
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||||
|
|
||||||
@ -58,7 +59,7 @@ def setup_platform(
|
|||||||
|
|
||||||
# Make sure we end the pandora subprocess on exit in case user doesn't
|
# Make sure we end the pandora subprocess on exit in case user doesn't
|
||||||
# power it down.
|
# power it down.
|
||||||
def _stop_pianobar(_event):
|
def _stop_pianobar(_event: Event) -> None:
|
||||||
pandora.turn_off()
|
pandora.turn_off()
|
||||||
|
|
||||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _stop_pianobar)
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _stop_pianobar)
|
||||||
@ -80,7 +81,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
| MediaPlayerEntityFeature.PLAY
|
| MediaPlayerEntityFeature.PLAY
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, name):
|
def __init__(self, name: str) -> None:
|
||||||
"""Initialize the Pandora device."""
|
"""Initialize the Pandora device."""
|
||||||
self._attr_name = name
|
self._attr_name = name
|
||||||
self._attr_state = MediaPlayerState.OFF
|
self._attr_state = MediaPlayerState.OFF
|
||||||
@ -91,7 +92,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
self._attr_source_list = []
|
self._attr_source_list = []
|
||||||
self._time_remaining = 0
|
self._time_remaining = 0
|
||||||
self._attr_media_duration = 0
|
self._attr_media_duration = 0
|
||||||
self._pianobar = None
|
self._pianobar: pexpect.spawn | None = None
|
||||||
|
|
||||||
def turn_on(self) -> None:
|
def turn_on(self) -> None:
|
||||||
"""Turn the media player on."""
|
"""Turn the media player on."""
|
||||||
@ -173,13 +174,15 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
_LOGGER.warning("Station %s is not in list", source)
|
_LOGGER.warning("Station %s is not in list", source)
|
||||||
return
|
return
|
||||||
_LOGGER.debug("Setting station %s, %d", source, station_index)
|
_LOGGER.debug("Setting station %s, %d", source, station_index)
|
||||||
|
assert self._pianobar is not None
|
||||||
self._send_station_list_command()
|
self._send_station_list_command()
|
||||||
self._pianobar.sendline(f"{station_index}")
|
self._pianobar.sendline(f"{station_index}")
|
||||||
self._pianobar.expect("\r\n")
|
self._pianobar.expect("\r\n")
|
||||||
self._attr_state = MediaPlayerState.PLAYING
|
self._attr_state = MediaPlayerState.PLAYING
|
||||||
|
|
||||||
def _send_station_list_command(self):
|
def _send_station_list_command(self) -> None:
|
||||||
"""Send a station list command."""
|
"""Send a station list command."""
|
||||||
|
assert self._pianobar is not None
|
||||||
self._pianobar.send("s")
|
self._pianobar.send("s")
|
||||||
try:
|
try:
|
||||||
self._pianobar.expect("Select station:", timeout=1)
|
self._pianobar.expect("Select station:", timeout=1)
|
||||||
@ -189,7 +192,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
self._pianobar.send("s")
|
self._pianobar.send("s")
|
||||||
self._pianobar.expect("Select station:")
|
self._pianobar.expect("Select station:")
|
||||||
|
|
||||||
def update_playing_status(self):
|
def update_playing_status(self) -> None:
|
||||||
"""Query pianobar for info about current media_title, station."""
|
"""Query pianobar for info about current media_title, station."""
|
||||||
response = self._query_for_playing_status()
|
response = self._query_for_playing_status()
|
||||||
if not response:
|
if not response:
|
||||||
@ -198,8 +201,9 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
self._update_current_song(response)
|
self._update_current_song(response)
|
||||||
self._update_song_position()
|
self._update_song_position()
|
||||||
|
|
||||||
def _query_for_playing_status(self):
|
def _query_for_playing_status(self) -> str | None:
|
||||||
"""Query system for info about current track."""
|
"""Query system for info about current track."""
|
||||||
|
assert self._pianobar is not None
|
||||||
self._clear_buffer()
|
self._clear_buffer()
|
||||||
self._pianobar.send("i")
|
self._pianobar.send("i")
|
||||||
try:
|
try:
|
||||||
@ -224,15 +228,17 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
_LOGGER.warning("On unexpected station list page")
|
_LOGGER.warning("On unexpected station list page")
|
||||||
self._pianobar.sendcontrol("m") # press enter
|
self._pianobar.sendcontrol("m") # press enter
|
||||||
self._pianobar.sendcontrol("m") # do it again b/c an 'i' got in
|
self._pianobar.sendcontrol("m") # do it again b/c an 'i' got in
|
||||||
response = self.update_playing_status()
|
self.update_playing_status()
|
||||||
|
response = None
|
||||||
elif match_idx == 3:
|
elif match_idx == 3:
|
||||||
_LOGGER.debug("Received new playlist list")
|
_LOGGER.debug("Received new playlist list")
|
||||||
response = self.update_playing_status()
|
self.update_playing_status()
|
||||||
|
response = None
|
||||||
else:
|
else:
|
||||||
response = self._pianobar.before.decode("utf-8")
|
response = self._pianobar.before.decode("utf-8")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _update_current_station(self, response):
|
def _update_current_station(self, response: str) -> None:
|
||||||
"""Update current station."""
|
"""Update current station."""
|
||||||
if station_match := re.search(STATION_PATTERN, response):
|
if station_match := re.search(STATION_PATTERN, response):
|
||||||
self._attr_source = station_match.group(1)
|
self._attr_source = station_match.group(1)
|
||||||
@ -240,7 +246,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
else:
|
else:
|
||||||
_LOGGER.warning("No station match")
|
_LOGGER.warning("No station match")
|
||||||
|
|
||||||
def _update_current_song(self, response):
|
def _update_current_song(self, response: str) -> None:
|
||||||
"""Update info about current song."""
|
"""Update info about current song."""
|
||||||
if song_match := re.search(CURRENT_SONG_PATTERN, response):
|
if song_match := re.search(CURRENT_SONG_PATTERN, response):
|
||||||
(
|
(
|
||||||
@ -253,19 +259,20 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
_LOGGER.warning("No song match")
|
_LOGGER.warning("No song match")
|
||||||
|
|
||||||
@util.Throttle(MIN_TIME_BETWEEN_UPDATES)
|
@util.Throttle(MIN_TIME_BETWEEN_UPDATES)
|
||||||
def _update_song_position(self):
|
def _update_song_position(self) -> None:
|
||||||
"""Get the song position and duration.
|
"""Get the song position and duration.
|
||||||
|
|
||||||
It's hard to predict whether or not the music will start during init
|
It's hard to predict whether or not the music will start during init
|
||||||
so we have to detect state by checking the ticker.
|
so we have to detect state by checking the ticker.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
assert self._pianobar is not None
|
||||||
(
|
(
|
||||||
cur_minutes,
|
cur_minutes,
|
||||||
cur_seconds,
|
cur_seconds,
|
||||||
total_minutes,
|
total_minutes,
|
||||||
total_seconds,
|
total_seconds,
|
||||||
) = self._pianobar.match.groups()
|
) = cast(re.Match[str], self._pianobar.match).groups()
|
||||||
time_remaining = int(cur_minutes) * 60 + int(cur_seconds)
|
time_remaining = int(cur_minutes) * 60 + int(cur_seconds)
|
||||||
self._attr_media_duration = int(total_minutes) * 60 + int(total_seconds)
|
self._attr_media_duration = int(total_minutes) * 60 + int(total_seconds)
|
||||||
|
|
||||||
@ -275,8 +282,9 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
self._attr_state = MediaPlayerState.PAUSED
|
self._attr_state = MediaPlayerState.PAUSED
|
||||||
self._time_remaining = time_remaining
|
self._time_remaining = time_remaining
|
||||||
|
|
||||||
def _log_match(self):
|
def _log_match(self) -> None:
|
||||||
"""Log grabbed values from console."""
|
"""Log grabbed values from console."""
|
||||||
|
assert self._pianobar is not None
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Before: %s\nMatch: %s\nAfter: %s",
|
"Before: %s\nMatch: %s\nAfter: %s",
|
||||||
repr(self._pianobar.before),
|
repr(self._pianobar.before),
|
||||||
@ -284,17 +292,20 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
repr(self._pianobar.after),
|
repr(self._pianobar.after),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _send_pianobar_command(self, service_cmd):
|
def _send_pianobar_command(self, service_cmd: str) -> None:
|
||||||
"""Send a command to Pianobar."""
|
"""Send a command to Pianobar."""
|
||||||
|
assert self._pianobar is not None
|
||||||
command = CMD_MAP.get(service_cmd)
|
command = CMD_MAP.get(service_cmd)
|
||||||
_LOGGER.debug("Sending pinaobar command %s for %s", command, service_cmd)
|
_LOGGER.debug("Sending pinaobar command %s for %s", command, service_cmd)
|
||||||
if command is None:
|
if command is None:
|
||||||
_LOGGER.warning("Command %s not supported yet", service_cmd)
|
_LOGGER.warning("Command %s not supported yet", service_cmd)
|
||||||
|
return
|
||||||
self._clear_buffer()
|
self._clear_buffer()
|
||||||
self._pianobar.sendline(command)
|
self._pianobar.sendline(command)
|
||||||
|
|
||||||
def _update_stations(self):
|
def _update_stations(self) -> None:
|
||||||
"""List defined Pandora stations."""
|
"""List defined Pandora stations."""
|
||||||
|
assert self._pianobar is not None
|
||||||
self._send_station_list_command()
|
self._send_station_list_command()
|
||||||
station_lines = self._pianobar.before.decode("utf-8")
|
station_lines = self._pianobar.before.decode("utf-8")
|
||||||
_LOGGER.debug("Getting stations: %s", station_lines)
|
_LOGGER.debug("Getting stations: %s", station_lines)
|
||||||
@ -309,12 +320,13 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
self._pianobar.sendcontrol("m") # press enter with blank line
|
self._pianobar.sendcontrol("m") # press enter with blank line
|
||||||
self._pianobar.sendcontrol("m") # do it twice in case an 'i' got in
|
self._pianobar.sendcontrol("m") # do it twice in case an 'i' got in
|
||||||
|
|
||||||
def _clear_buffer(self):
|
def _clear_buffer(self) -> None:
|
||||||
"""Clear buffer from pexpect.
|
"""Clear buffer from pexpect.
|
||||||
|
|
||||||
This is necessary because there are a bunch of 00:00 in the buffer
|
This is necessary because there are a bunch of 00:00 in the buffer
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
assert self._pianobar is not None
|
||||||
try:
|
try:
|
||||||
while not self._pianobar.expect(".+", timeout=0.1):
|
while not self._pianobar.expect(".+", timeout=0.1):
|
||||||
pass
|
pass
|
||||||
@ -324,7 +336,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _pianobar_exists():
|
def _pianobar_exists() -> bool:
|
||||||
"""Verify that Pianobar is properly installed."""
|
"""Verify that Pianobar is properly installed."""
|
||||||
pianobar_exe = shutil.which("pianobar")
|
pianobar_exe = shutil.which("pianobar")
|
||||||
if pianobar_exe:
|
if pianobar_exe:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user