Add types package for pexpect (#134461)

This commit is contained in:
Marc Mueller 2025-01-03 02:53:08 +01:00 committed by GitHub
parent cb389d29ea
commit 6f3544fa47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 24 additions and 27 deletions

View File

@ -90,7 +90,7 @@ class ArubaDeviceScanner(DeviceScanner):
"""Retrieve data from Aruba Access Point and return parsed result.""" """Retrieve data from Aruba Access Point and return parsed result."""
connect = f"ssh {self.username}@{self.host} -o HostKeyAlgorithms=ssh-rsa" connect = f"ssh {self.username}@{self.host} -o HostKeyAlgorithms=ssh-rsa"
ssh = pexpect.spawn(connect) ssh: pexpect.spawn[str] = pexpect.spawn(connect, encoding="utf-8")
query = ssh.expect( query = ssh.expect(
[ [
"password:", "password:",
@ -125,12 +125,12 @@ class ArubaDeviceScanner(DeviceScanner):
ssh.expect("#") ssh.expect("#")
ssh.sendline("show clients") ssh.sendline("show clients")
ssh.expect("#") ssh.expect("#")
devices_result = ssh.before.split(b"\r\n") devices_result = (ssh.before or "").splitlines()
ssh.sendline("exit") ssh.sendline("exit")
devices: dict[str, dict[str, str]] = {} devices: dict[str, dict[str, str]] = {}
for device in devices_result: for device in devices_result:
if match := _DEVICES_REGEX.search(device.decode("utf-8")): if match := _DEVICES_REGEX.search(device):
devices[match.group("ip")] = { devices[match.group("ip")] = {
"ip": match.group("ip"), "ip": match.group("ip"),
"mac": match.group("mac").upper(), "mac": match.group("mac").upper(),

View File

@ -3,7 +3,6 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import re
from pexpect import pxssh from pexpect import pxssh
import voluptuous as vol import voluptuous as vol
@ -101,11 +100,11 @@ class CiscoDeviceScanner(DeviceScanner):
return False return False
def _get_arp_data(self): def _get_arp_data(self) -> str | None:
"""Open connection to the router and get arp entries.""" """Open connection to the router and get arp entries."""
try: try:
cisco_ssh = pxssh.pxssh() cisco_ssh: pxssh.pxssh[str] = pxssh.pxssh(encoding="uft-8")
cisco_ssh.login( cisco_ssh.login(
self.host, self.host,
self.username, self.username,
@ -115,12 +114,11 @@ class CiscoDeviceScanner(DeviceScanner):
) )
# Find the hostname # Find the hostname
initial_line = cisco_ssh.before.decode("utf-8").splitlines() initial_line = (cisco_ssh.before or "").splitlines()
router_hostname = initial_line[len(initial_line) - 1] router_hostname = initial_line[len(initial_line) - 1]
router_hostname += "#" router_hostname += "#"
# Set the discovered hostname as prompt # Set the discovered hostname as prompt
regex_expression = f"(?i)^{router_hostname}".encode() cisco_ssh.PROMPT = f"(?i)^{router_hostname}"
cisco_ssh.PROMPT = re.compile(regex_expression, re.MULTILINE)
# Allow full arp table to print at once # Allow full arp table to print at once
cisco_ssh.sendline("terminal length 0") cisco_ssh.sendline("terminal length 0")
cisco_ssh.prompt(1) cisco_ssh.prompt(1)
@ -128,13 +126,11 @@ class CiscoDeviceScanner(DeviceScanner):
cisco_ssh.sendline("show ip arp") cisco_ssh.sendline("show ip arp")
cisco_ssh.prompt(1) cisco_ssh.prompt(1)
devices_result = cisco_ssh.before
return devices_result.decode("utf-8")
except pxssh.ExceptionPxssh as px_e: except pxssh.ExceptionPxssh as px_e:
_LOGGER.error("Failed to login via pxssh: %s", px_e) _LOGGER.error("Failed to login via pxssh: %s", px_e)
return None
return None return cisco_ssh.before
def _parse_cisco_mac_address(cisco_hardware_addr): def _parse_cisco_mac_address(cisco_hardware_addr):

View File

@ -92,13 +92,13 @@ class PandoraMediaPlayer(MediaPlayerEntity):
self._attr_source_list = [] self._attr_source_list = []
self._time_remaining = 0 self._time_remaining = 0
self._attr_media_duration = 0 self._attr_media_duration = 0
self._pianobar: pexpect.spawn | None = None self._pianobar: pexpect.spawn[str] | None = None
def turn_on(self) -> None: def turn_on(self) -> None:
"""Turn the media player on.""" """Turn the media player on."""
if self.state != MediaPlayerState.OFF: if self.state != MediaPlayerState.OFF:
return return
self._pianobar = pexpect.spawn("pianobar") self._pianobar = pexpect.spawn("pianobar", encoding="utf-8")
_LOGGER.debug("Started pianobar subprocess") _LOGGER.debug("Started pianobar subprocess")
mode = self._pianobar.expect( mode = self._pianobar.expect(
["Receiving new playlist", "Select station:", "Email:"] ["Receiving new playlist", "Select station:", "Email:"]
@ -135,8 +135,9 @@ class PandoraMediaPlayer(MediaPlayerEntity):
self._pianobar.terminate() self._pianobar.terminate()
except pexpect.exceptions.TIMEOUT: except pexpect.exceptions.TIMEOUT:
# kill the process group # kill the process group
os.killpg(os.getpgid(self._pianobar.pid), signal.SIGTERM) if (pid := self._pianobar.pid) is not None:
_LOGGER.debug("Killed Pianobar subprocess") os.killpg(os.getpgid(pid), signal.SIGTERM)
_LOGGER.debug("Killed Pianobar subprocess")
self._pianobar = None self._pianobar = None
self._attr_state = MediaPlayerState.OFF self._attr_state = MediaPlayerState.OFF
self.schedule_update_ha_state() self.schedule_update_ha_state()
@ -209,7 +210,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
try: try:
match_idx = self._pianobar.expect( match_idx = self._pianobar.expect(
[ [
rb"(\d\d):(\d\d)/(\d\d):(\d\d)", r"(\d\d):(\d\d)/(\d\d):(\d\d)",
"No song playing", "No song playing",
"Select station", "Select station",
"Receiving new playlist", "Receiving new playlist",
@ -222,21 +223,20 @@ class PandoraMediaPlayer(MediaPlayerEntity):
self._log_match() self._log_match()
if match_idx == 1: if match_idx == 1:
# idle. # idle.
response = None return None
elif match_idx == 2: if match_idx == 2:
# stuck on a station selection dialog. Clear it. # stuck on a station selection dialog. Clear it.
_LOGGER.warning("On unexpected station list page") _LOGGER.warning("On unexpected station list page")
self._pianobar.sendcontrol("m") # press enter self._pianobar.sendcontrol("m") # press enter
self._pianobar.sendcontrol("m") # do it again b/c an 'i' got in self._pianobar.sendcontrol("m") # do it again b/c an 'i' got in
self.update_playing_status() self.update_playing_status()
response = None return None
elif match_idx == 3: if match_idx == 3:
_LOGGER.debug("Received new playlist list") _LOGGER.debug("Received new playlist list")
self.update_playing_status() self.update_playing_status()
response = None return None
else:
response = self._pianobar.before.decode("utf-8") return self._pianobar.before
return response
def _update_current_station(self, response: str) -> None: def _update_current_station(self, response: str) -> None:
"""Update current station.""" """Update current station."""
@ -307,7 +307,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
"""List defined Pandora stations.""" """List defined Pandora stations."""
assert self._pianobar is not None assert self._pianobar is not None
self._send_station_list_command() self._send_station_list_command()
station_lines = self._pianobar.before.decode("utf-8") station_lines = self._pianobar.before or ""
_LOGGER.debug("Getting stations: %s", station_lines) _LOGGER.debug("Getting stations: %s", station_lines)
self._attr_source_list = [] self._attr_source_list = []
for line in station_lines.split("\r\n"): for line in station_lines.split("\r\n"):

View File

@ -42,6 +42,7 @@ types-caldav==1.3.0.20241107
types-chardet==0.1.5 types-chardet==0.1.5
types-decorator==5.1.8.20240310 types-decorator==5.1.8.20240310
types-paho-mqtt==1.6.0.20240321 types-paho-mqtt==1.6.0.20240321
types-pexpect==4.9.0.20241208
types-pillow==10.2.0.20240822 types-pillow==10.2.0.20240822
types-protobuf==5.29.1.20241207 types-protobuf==5.29.1.20241207
types-psutil==6.1.0.20241221 types-psutil==6.1.0.20241221