Remove calls to distribution and legacy zip support from package util (#107427)

This commit is contained in:
J. Nick Koston 2024-01-07 07:39:48 -10:00 committed by GitHub
parent 901b9365b4
commit 75d591593d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 33 deletions

View File

@ -3,13 +3,12 @@ from __future__ import annotations
import asyncio import asyncio
from functools import cache from functools import cache
from importlib.metadata import PackageNotFoundError, distribution, version from importlib.metadata import PackageNotFoundError, version
import logging import logging
import os import os
from pathlib import Path from pathlib import Path
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
import sys import sys
from urllib.parse import urlparse
from packaging.requirements import InvalidRequirement, Requirement from packaging.requirements import InvalidRequirement, Requirement
@ -30,29 +29,26 @@ def is_docker_env() -> bool:
return Path("/.dockerenv").exists() return Path("/.dockerenv").exists()
def is_installed(package: str) -> bool: def is_installed(requirement_str: str) -> bool:
"""Check if a package is installed and will be loaded when we import it. """Check if a package is installed and will be loaded when we import it.
expected input is a pip compatible package specifier (requirement string)
e.g. "package==1.0.0" or "package>=1.0.0,<2.0.0"
Returns True when the requirement is met. Returns True when the requirement is met.
Returns False when the package is not installed or doesn't meet req. Returns False when the package is not installed or doesn't meet req.
""" """
try: try:
distribution(package) req = Requirement(requirement_str)
return True except InvalidRequirement:
except (IndexError, PackageNotFoundError): _LOGGER.error("Invalid requirement '%s'", requirement_str)
try: return False
req = Requirement(package)
except InvalidRequirement:
# This is a zip file. We no longer use this in Home Assistant,
# leaving it in for custom components.
req = Requirement(urlparse(package).fragment)
try: try:
installed_version = version(req.name) if (installed_version := version(req.name)) is None:
# This will happen when an install failed or # This can happen when an install failed or
# was aborted while in progress see # was aborted while in progress see
# https://github.com/home-assistant/core/issues/47699 # https://github.com/home-assistant/core/issues/47699
if installed_version is None:
_LOGGER.error( # type: ignore[unreachable] _LOGGER.error( # type: ignore[unreachable]
"Installed version for %s resolved to None", req.name "Installed version for %s resolved to None", req.name
) )

View File

@ -1,6 +1,6 @@
"""Test Home Assistant package util methods.""" """Test Home Assistant package util methods."""
import asyncio import asyncio
from importlib.metadata import PackageNotFoundError, metadata from importlib.metadata import metadata
import logging import logging
import os import os
from subprocess import PIPE from subprocess import PIPE
@ -235,21 +235,17 @@ def test_check_package_zip() -> None:
assert not package.is_installed(TEST_ZIP_REQ) assert not package.is_installed(TEST_ZIP_REQ)
def test_get_distribution_falls_back_to_version() -> None: def test_get_is_installed() -> None:
"""Test for get_distribution failing and fallback to version.""" """Test is_installed can parse complex requirements."""
pkg = metadata("homeassistant") pkg = metadata("homeassistant")
installed_package = pkg["name"] installed_package = pkg["name"]
installed_version = pkg["version"] installed_version = pkg["version"]
with patch( assert package.is_installed(installed_package)
"homeassistant.util.package.distribution", assert package.is_installed(f"{installed_package}=={installed_version}")
side_effect=PackageNotFoundError, assert package.is_installed(f"{installed_package}>={installed_version}")
): assert package.is_installed(f"{installed_package}<={installed_version}")
assert package.is_installed(installed_package) assert not package.is_installed(f"{installed_package}<{installed_version}")
assert package.is_installed(f"{installed_package}=={installed_version}")
assert package.is_installed(f"{installed_package}>={installed_version}")
assert package.is_installed(f"{installed_package}<={installed_version}")
assert not package.is_installed(f"{installed_package}<{installed_version}")
def test_check_package_previous_failed_install() -> None: def test_check_package_previous_failed_install() -> None:
@ -258,9 +254,6 @@ def test_check_package_previous_failed_install() -> None:
installed_package = pkg["name"] installed_package = pkg["name"]
installed_version = pkg["version"] installed_version = pkg["version"]
with patch( with patch("homeassistant.util.package.version", return_value=None):
"homeassistant.util.package.distribution",
side_effect=PackageNotFoundError,
), patch("homeassistant.util.package.version", return_value=None):
assert not package.is_installed(installed_package) assert not package.is_installed(installed_package)
assert not package.is_installed(f"{installed_package}=={installed_version}") assert not package.is_installed(f"{installed_package}=={installed_version}")