diff --git a/homeassistant/requirements.py b/homeassistant/requirements.py index 7631586d626..9a8cd20983a 100644 --- a/homeassistant/requirements.py +++ b/homeassistant/requirements.py @@ -15,10 +15,7 @@ from .util import package as pkg_util PIP_TIMEOUT = 60 # The default is too low when the internet connection is satellite or high latency MAX_INSTALL_FAILURES = 3 -DATA_PIP_LOCK = "pip_lock" -DATA_PKG_CACHE = "pkg_cache" -DATA_INTEGRATIONS_WITH_REQS = "integrations_with_reqs" -DATA_INSTALL_FAILURE_HISTORY = "install_failure_history" +DATA_REQUIREMENTS_MANAGER = "requirements_manager" CONSTRAINT_FILE = "package_constraints.txt" DISCOVERY_INTEGRATIONS: dict[str, Iterable[str]] = { "dhcp": ("dhcp",), @@ -40,7 +37,7 @@ class RequirementsNotFound(HomeAssistantError): async def async_get_integration_with_requirements( - hass: HomeAssistant, domain: str, done: set[str] | None = None + hass: HomeAssistant, domain: str ) -> Integration: """Get an integration with all requirements installed, including the dependencies. @@ -48,97 +45,8 @@ async def async_get_integration_with_requirements( is invalid, RequirementNotFound if there was some type of failure to install requirements. """ - if done is None: - done = {domain} - else: - done.add(domain) - - integration = await async_get_integration(hass, domain) - - if hass.config.skip_pip: - return integration - - if (cache := hass.data.get(DATA_INTEGRATIONS_WITH_REQS)) is None: - cache = hass.data[DATA_INTEGRATIONS_WITH_REQS] = {} - - int_or_evt: Integration | asyncio.Event | None | UndefinedType = cache.get( - domain, UNDEFINED - ) - - if isinstance(int_or_evt, asyncio.Event): - await int_or_evt.wait() - - # When we have waited and it's UNDEFINED, it doesn't exist - # We don't cache that it doesn't exist, or else people can't fix it - # and then restart, because their config will never be valid. - if (int_or_evt := cache.get(domain, UNDEFINED)) is UNDEFINED: - raise IntegrationNotFound(domain) - - if int_or_evt is not UNDEFINED: - return cast(Integration, int_or_evt) - - event = cache[domain] = asyncio.Event() - - try: - await _async_process_integration(hass, integration, done) - except Exception: - del cache[domain] - event.set() - raise - - cache[domain] = integration - event.set() - return integration - - -async def _async_process_integration( - hass: HomeAssistant, integration: Integration, done: set[str] -) -> None: - """Process an integration and requirements.""" - if integration.requirements: - await async_process_requirements( - hass, integration.domain, integration.requirements - ) - - deps_to_check = [ - dep - for dep in integration.dependencies + integration.after_dependencies - if dep not in done - ] - - for check_domain, to_check in DISCOVERY_INTEGRATIONS.items(): - if ( - check_domain not in done - and check_domain not in deps_to_check - and any(check in integration.manifest for check in to_check) - ): - deps_to_check.append(check_domain) - - if not deps_to_check: - return - - results = await asyncio.gather( - *( - async_get_integration_with_requirements(hass, dep, done) - for dep in deps_to_check - ), - return_exceptions=True, - ) - for result in results: - if not isinstance(result, BaseException): - continue - if not isinstance(result, IntegrationNotFound) or not ( - not integration.is_built_in - and result.domain in integration.after_dependencies - ): - raise result - - -@callback -def async_clear_install_history(hass: HomeAssistant) -> None: - """Forget the install history.""" - if install_failure_history := hass.data.get(DATA_INSTALL_FAILURE_HISTORY): - install_failure_history.clear() + manager = _async_get_manager(hass) + return await manager.async_get_integration_with_requirements(domain) async def async_process_requirements( @@ -149,49 +57,24 @@ async def async_process_requirements( This method is a coroutine. It will raise RequirementsNotFound if an requirement can't be satisfied. """ - if (pip_lock := hass.data.get(DATA_PIP_LOCK)) is None: - pip_lock = hass.data[DATA_PIP_LOCK] = asyncio.Lock() - install_failure_history = hass.data.get(DATA_INSTALL_FAILURE_HISTORY) - if install_failure_history is None: - install_failure_history = hass.data[DATA_INSTALL_FAILURE_HISTORY] = set() - - kwargs = pip_kwargs(hass.config.config_dir) - - async with pip_lock: - for req in requirements: - await _async_process_requirements( - hass, name, req, install_failure_history, kwargs - ) + await _async_get_manager(hass).async_process_requirements(name, requirements) -async def _async_process_requirements( - hass: HomeAssistant, - name: str, - req: str, - install_failure_history: set[str], - kwargs: Any, -) -> None: - """Install a requirement and save failures.""" - if req in install_failure_history: - _LOGGER.info( - "Multiple attempts to install %s failed, install will be retried after next configuration check or restart", - req, - ) - raise RequirementsNotFound(name, [req]) +@callback +def _async_get_manager(hass: HomeAssistant) -> RequirementsManager: + """Get the requirements manager.""" + if DATA_REQUIREMENTS_MANAGER in hass.data: + manager: RequirementsManager = hass.data[DATA_REQUIREMENTS_MANAGER] + return manager - if pkg_util.is_installed(req): - return + manager = hass.data[DATA_REQUIREMENTS_MANAGER] = RequirementsManager(hass) + return manager - def _install(req: str, kwargs: dict[str, Any]) -> bool: - """Install requirement.""" - return pkg_util.install_package(req, **kwargs) - for _ in range(MAX_INSTALL_FAILURES): - if await hass.async_add_executor_job(_install, req, kwargs): - return - - install_failure_history.add(req) - raise RequirementsNotFound(name, [req]) +@callback +def async_clear_install_history(hass: HomeAssistant) -> None: + """Forget the install history.""" + _async_get_manager(hass).install_failure_history.clear() def pip_kwargs(config_dir: str | None) -> dict[str, Any]: @@ -207,3 +90,178 @@ def pip_kwargs(config_dir: str | None) -> dict[str, Any]: if not (config_dir is None or pkg_util.is_virtual_env()) and not is_docker: kwargs["target"] = os.path.join(config_dir, "deps") return kwargs + + +def _install_with_retry(requirement: str, kwargs: dict[str, Any]) -> bool: + """Try to install a package up to MAX_INSTALL_FAILURES times.""" + for _ in range(MAX_INSTALL_FAILURES): + if pkg_util.install_package(requirement, **kwargs): + return True + return False + + +def _install_requirements_if_missing( + requirements: list[str], kwargs: dict[str, Any] +) -> tuple[set[str], set[str]]: + """Install requirements if missing.""" + installed: set[str] = set() + failures: set[str] = set() + for req in requirements: + if pkg_util.is_installed(req) or _install_with_retry(req, kwargs): + installed.add(req) + continue + failures.add(req) + return installed, failures + + +class RequirementsManager: + """Manage requirements.""" + + def __init__(self, hass: HomeAssistant) -> None: + """Init the requirements manager.""" + self.hass = hass + self.pip_lock = asyncio.Lock() + self.integrations_with_reqs: dict[ + str, Integration | asyncio.Event | None | UndefinedType + ] = {} + self.install_failure_history: set[str] = set() + self.is_installed_cache: set[str] = set() + + async def async_get_integration_with_requirements( + self, domain: str, done: set[str] | None = None + ) -> Integration: + """Get an integration with all requirements installed, including the dependencies. + + This can raise IntegrationNotFound if manifest or integration + is invalid, RequirementNotFound if there was some type of + failure to install requirements. + """ + + if done is None: + done = {domain} + else: + done.add(domain) + + integration = await async_get_integration(self.hass, domain) + + if self.hass.config.skip_pip: + return integration + + cache = self.integrations_with_reqs + int_or_evt = cache.get(domain, UNDEFINED) + + if isinstance(int_or_evt, asyncio.Event): + await int_or_evt.wait() + + # When we have waited and it's UNDEFINED, it doesn't exist + # We don't cache that it doesn't exist, or else people can't fix it + # and then restart, because their config will never be valid. + if (int_or_evt := cache.get(domain, UNDEFINED)) is UNDEFINED: + raise IntegrationNotFound(domain) + + if int_or_evt is not UNDEFINED: + return cast(Integration, int_or_evt) + + event = cache[domain] = asyncio.Event() + + try: + await self._async_process_integration(integration, done) + except Exception: + del cache[domain] + event.set() + raise + + cache[domain] = integration + event.set() + return integration + + async def _async_process_integration( + self, integration: Integration, done: set[str] + ) -> None: + """Process an integration and requirements.""" + if integration.requirements: + await self.async_process_requirements( + integration.domain, integration.requirements + ) + + deps_to_check = [ + dep + for dep in integration.dependencies + integration.after_dependencies + if dep not in done + ] + + for check_domain, to_check in DISCOVERY_INTEGRATIONS.items(): + if ( + check_domain not in done + and check_domain not in deps_to_check + and any(check in integration.manifest for check in to_check) + ): + deps_to_check.append(check_domain) + + if not deps_to_check: + return + + results = await asyncio.gather( + *( + self.async_get_integration_with_requirements(dep, done) + for dep in deps_to_check + ), + return_exceptions=True, + ) + for result in results: + if not isinstance(result, BaseException): + continue + if not isinstance(result, IntegrationNotFound) or not ( + not integration.is_built_in + and result.domain in integration.after_dependencies + ): + raise result + + async def async_process_requirements( + self, name: str, requirements: list[str] + ) -> None: + """Install the requirements for a component or platform. + + This method is a coroutine. It will raise RequirementsNotFound + if an requirement can't be satisfied. + """ + if not (missing := self._find_missing_requirements(requirements)): + return + self._raise_for_failed_requirements(name, missing) + + async with self.pip_lock: + # Recaculate missing again now that we have the lock + await self._async_process_requirements( + name, self._find_missing_requirements(requirements) + ) + + def _find_missing_requirements(self, requirements: list[str]) -> list[str]: + """Find requirements that are missing in the cache.""" + return [req for req in requirements if req not in self.is_installed_cache] + + def _raise_for_failed_requirements( + self, integration: str, missing: list[str] + ) -> None: + """Raise RequirementsNotFound so we do not keep trying requirements that have already failed.""" + for req in missing: + if req in self.install_failure_history: + _LOGGER.info( + "Multiple attempts to install %s failed, install will be retried after next configuration check or restart", + req, + ) + raise RequirementsNotFound(integration, [req]) + + async def _async_process_requirements( + self, + name: str, + requirements: list[str], + ) -> None: + """Install a requirement and save failures.""" + kwargs = pip_kwargs(self.hass.config.config_dir) + installed, failures = await self.hass.async_add_executor_job( + _install_requirements_if_missing, requirements, kwargs + ) + self.is_installed_cache |= installed + self.install_failure_history |= failures + if failures: + raise RequirementsNotFound(name, list(failures)) diff --git a/tests/test_requirements.py b/tests/test_requirements.py index 7e4dba42c2f..77499707489 100644 --- a/tests/test_requirements.py +++ b/tests/test_requirements.py @@ -213,16 +213,9 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha assert integration assert integration.domain == "test_component" - assert len(mock_is_installed.mock_calls) == 1 - assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [ - "test-comp==1.0.0", - ] - + assert len(mock_is_installed.mock_calls) == 0 # On another attempt we remember failures and don't try again - assert len(mock_inst.mock_calls) == 1 - assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [ - "test-comp==1.0.0" - ] + assert len(mock_inst.mock_calls) == 0 # Now clear the history and so we try again async_clear_install_history(hass) @@ -239,14 +232,13 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha assert integration assert integration.domain == "test_component" - assert len(mock_is_installed.mock_calls) == 3 + assert len(mock_is_installed.mock_calls) == 2 assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [ "test-comp-after-dep==1.0.0", "test-comp-dep==1.0.0", - "test-comp==1.0.0", ] - assert len(mock_inst.mock_calls) == 7 + assert len(mock_inst.mock_calls) == 6 assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [ "test-comp-after-dep==1.0.0", "test-comp-after-dep==1.0.0", @@ -254,7 +246,6 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha "test-comp-dep==1.0.0", "test-comp-dep==1.0.0", "test-comp-dep==1.0.0", - "test-comp==1.0.0", ] # Now clear the history and mock success @@ -272,18 +263,16 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha assert integration assert integration.domain == "test_component" - assert len(mock_is_installed.mock_calls) == 3 + assert len(mock_is_installed.mock_calls) == 2 assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [ "test-comp-after-dep==1.0.0", "test-comp-dep==1.0.0", - "test-comp==1.0.0", ] - assert len(mock_inst.mock_calls) == 3 + assert len(mock_inst.mock_calls) == 2 assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [ "test-comp-after-dep==1.0.0", "test-comp-dep==1.0.0", - "test-comp==1.0.0", ] @@ -408,12 +397,12 @@ async def test_discovery_requirements_mqtt(hass): hass, MockModule("mqtt_comp", partial_manifest={"mqtt": ["foo/discovery"]}) ) with patch( - "homeassistant.requirements.async_process_requirements", + "homeassistant.requirements.RequirementsManager.async_process_requirements", ) as mock_process: await async_get_integration_with_requirements(hass, "mqtt_comp") assert len(mock_process.mock_calls) == 2 # mqtt also depends on http - assert mock_process.mock_calls[0][1][2] == mqtt.requirements + assert mock_process.mock_calls[0][1][1] == mqtt.requirements async def test_discovery_requirements_ssdp(hass): @@ -425,17 +414,17 @@ async def test_discovery_requirements_ssdp(hass): hass, MockModule("ssdp_comp", partial_manifest={"ssdp": [{"st": "roku:ecp"}]}) ) with patch( - "homeassistant.requirements.async_process_requirements", + "homeassistant.requirements.RequirementsManager.async_process_requirements", ) as mock_process: await async_get_integration_with_requirements(hass, "ssdp_comp") assert len(mock_process.mock_calls) == 4 - assert mock_process.mock_calls[0][1][2] == ssdp.requirements + assert mock_process.mock_calls[0][1][1] == ssdp.requirements # Ensure zeroconf is a dep for ssdp assert { - mock_process.mock_calls[1][1][1], - mock_process.mock_calls[2][1][1], - mock_process.mock_calls[3][1][1], + mock_process.mock_calls[1][1][0], + mock_process.mock_calls[2][1][0], + mock_process.mock_calls[3][1][0], } == {"network", "zeroconf", "http"} @@ -454,12 +443,12 @@ async def test_discovery_requirements_zeroconf(hass, partial_manifest): ) with patch( - "homeassistant.requirements.async_process_requirements", + "homeassistant.requirements.RequirementsManager.async_process_requirements", ) as mock_process: await async_get_integration_with_requirements(hass, "comp") assert len(mock_process.mock_calls) == 3 # zeroconf also depends on http - assert mock_process.mock_calls[0][1][2] == zeroconf.requirements + assert mock_process.mock_calls[0][1][1] == zeroconf.requirements async def test_discovery_requirements_dhcp(hass): @@ -477,9 +466,9 @@ async def test_discovery_requirements_dhcp(hass): ), ) with patch( - "homeassistant.requirements.async_process_requirements", + "homeassistant.requirements.RequirementsManager.async_process_requirements", ) as mock_process: await async_get_integration_with_requirements(hass, "comp") assert len(mock_process.mock_calls) == 1 # dhcp does not depend on http - assert mock_process.mock_calls[0][1][2] == dhcp.requirements + assert mock_process.mock_calls[0][1][1] == dhcp.requirements