diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 4ae6dfd6e..fb484a8e2 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -68,7 +68,7 @@ jobs: with: name: logs path: | - tests/lg_logs/* + tests/lg_logs/** - name: Archive JUnit reports uses: actions/upload-artifact@v3 diff --git a/tests/conftest.py b/tests/conftest.py index ff7e1d8ec..eb3b89e97 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,23 @@ +import json +import logging import os +from labgrid.driver import ShellDriver import pytest +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True, scope="module") +def restart_qemu(strategy): + """Use fresh QEMU instance for each module.""" + if strategy.status.name == "shell": + logger.info("Restarting QEMU before %s module tests.", strategy.target.name) + strategy.transition("off") + strategy.transition("shell") + + @pytest.hookimpl def pytest_runtest_setup(item): log_dir = item.config.option.lg_log @@ -11,11 +26,25 @@ def pytest_runtest_setup(item): return logging_plugin = item.config.pluginmanager.get_plugin("logging-plugin") - logging_plugin.set_log_path(os.path.join(log_dir, f"{item.name}.log")) + log_name = item.nodeid.replace(".py::", "/") + logging_plugin.set_log_path(os.path.join(log_dir, f"{log_name}.log")) @pytest.fixture -def shell_command(target, strategy): +def shell(target, strategy) -> ShellDriver: + """Fixture for accessing shell.""" strategy.transition("shell") shell = target.get_driver("ShellDriver") return shell + + +@pytest.fixture +def shell_json(target, strategy) -> callable: + """Fixture for running CLI commands returning JSON string as output.""" + strategy.transition("shell") + shell = target.get_driver("ShellDriver") + + def get_json_response(command, *, timeout=60) -> dict: + return json.loads("\n".join(shell.run_check(command, timeout=timeout))) + + return get_json_response diff --git a/tests/qemu_shell_strategy.py b/tests/qemu_shell_strategy.py index d0055a246..4adfbf72f 100644 --- a/tests/qemu_shell_strategy.py +++ b/tests/qemu_shell_strategy.py @@ -44,6 +44,7 @@ class QEMUShellStrategy(Strategy): elif status == Status.off: self.target.activate(self.qemu) self.qemu.off() + self.target.deactivate(self.shell) elif status == Status.shell: self.target.activate(self.qemu) self.qemu.on() diff --git a/tests/requirements.txt b/tests/requirements.txt index bd1a303a0..88b093f3b 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1 +1,3 @@ labgrid==23.0.3 +pytest==7.2.2 +pytest-dependency==0.5.1 diff --git a/tests/run_tests.sh b/tests/run_tests.sh index 838b38e77..19757948c 100755 --- a/tests/run_tests.sh +++ b/tests/run_tests.sh @@ -13,4 +13,4 @@ if [ -z "$GITHUB_ACTIONS" ] && [ -z "$VIRTUAL_ENV" ]; then pip3 install -r requirements.txt fi -pytest --lg-env qemu-strategy.yaml --lg-log=lg_logs --junitxml=junit_reports/smoke_test.xml smoke_test +pytest --lg-env qemu-strategy.yaml --lg-log=lg_logs --junitxml=junit_reports/tests.xml "$@" diff --git a/tests/smoke_test/test_basic.py b/tests/smoke_test/test_basic.py index 98850e9cd..6d19b9a49 100644 --- a/tests/smoke_test/test_basic.py +++ b/tests/smoke_test/test_basic.py @@ -5,9 +5,9 @@ from time import sleep _LOGGER = logging.getLogger(__name__) -def test_init(shell_command): +def test_init(shell): def check_container_running(container_name): - out = shell_command.run_check( + out = shell.run_check( f"docker container inspect -f '{{{{.State.Status}}}}' {container_name} || true" ) return "running" in out @@ -21,28 +21,27 @@ def test_init(shell_command): # wait for system ready for _ in range(20): - output = "\n".join(shell_command.run_check("ha os info || true")) + output = "\n".join(shell.run_check("ha os info || true")) if "System is not ready" not in output: break sleep(5) - output = shell_command.run_check("ha os info") + output = shell.run_check("ha os info") + _LOGGER.info("%s", "\n".join(output)) + +def test_dmesg(shell): + output = shell.run_check("dmesg") _LOGGER.info("%s", "\n".join(output)) -def test_dmesg(shell_command): - output = shell_command.run_check("dmesg") +def test_supervisor_logs(shell): + output = shell.run_check("ha su logs") _LOGGER.info("%s", "\n".join(output)) -def test_supervisor_logs(shell_command): - output = shell_command.run_check("ha su logs") - _LOGGER.info("%s", "\n".join(output)) - - -def test_systemctl_status(shell_command): - output = shell_command.run_check( +def test_systemctl_status(shell): + output = shell.run_check( "systemctl --no-pager -l status -a || true", timeout=90 ) _LOGGER.info("%s", "\n".join(output)) diff --git a/tests/supervisor_test/test_supervisor.py b/tests/supervisor_test/test_supervisor.py new file mode 100644 index 000000000..5ee3ff959 --- /dev/null +++ b/tests/supervisor_test/test_supervisor.py @@ -0,0 +1,195 @@ +import logging +from time import sleep + +import pytest +from labgrid.driver import ExecutionError + + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def stash() -> dict: + """Simple stash for sharing data between tests in this module.""" + stash = {} + return stash + + +@pytest.mark.dependency() +def test_start_supervisor(shell, shell_json): + def check_container_running(container_name): + out = shell.run_check(f"docker container inspect -f '{{{{.State.Status}}}}' {container_name} || true") + return "running" in out + + for _ in range(20): + if check_container_running("homeassistant") and check_container_running("hassio_supervisor"): + break + + sleep(5) + + supervisor_ip = "\n".join( + shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor") + ) + + for _ in range(20): + try: + if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok": + break + except ExecutionError: + pass # avoid failure when the container is restarting + sleep(5) + else: + raise AssertionError("Supervisor did not start in time") + + +@pytest.mark.dependency(depends=["test_start_supervisor"]) +def test_check_supervisor(shell_json): + # check supervisor info + supervisor_info = shell_json("ha supervisor info --no-progress --raw-json") + assert supervisor_info.get("result") == "ok", "supervisor info failed" + logger.info("Supervisor info: %s", supervisor_info) + # check network info + network_info = shell_json("ha network info --no-progress --raw-json") + assert network_info.get("result") == "ok", "network info failed" + logger.info("Network info: %s", network_info) + + +@pytest.mark.dependency(depends=["test_check_supervisor"]) +def test_update_supervisor(shell_json): + supervisor_info = shell_json("ha supervisor info --no-progress --raw-json") + supervisor_version = supervisor_info.get("data").get("version") + if supervisor_version == supervisor_info.get("data").get("version_latest"): + logger.info("Supervisor is already up to date") + pytest.skip("Supervisor is already up to date") + else: + result = shell_json("ha supervisor update --no-progress --raw-json") + if result.get("result") == "error" and "Another job is running" in result.get("message"): + pass + else: + assert result.get("result") == "ok", f"Supervisor update failed: {result}" + + for _ in range(40): + try: + supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90) + data = supervisor_info.get("data") + if data and data.get("version") == data.get("version_latest"): + logger.info( + "Supervisor updated from %s to %s: %s", + supervisor_version, + data.get("version"), + supervisor_info, + ) + break + except ExecutionError: + pass # avoid failure when the container is restarting + sleep(5) + else: + raise AssertionError("Supervisor did not update in time") + + +@pytest.mark.dependency(depends=["test_update_supervisor"]) +def test_supervisor_is_updated(shell_json): + supervisor_info = shell_json("ha supervisor info --no-progress --raw-json", timeout=90) + data = supervisor_info.get("data") + assert data and data.get("version") == data.get("version_latest") + + +@pytest.mark.dependency(depends=["test_supervisor_is_updated"]) +def test_addon_install(shell_json): + # install Core SSH add-on + assert ( + shell_json("ha addons install core_ssh --no-progress --raw-json", timeout=300).get("result") == "ok" + ), "Core SSH add-on install failed" + # check Core SSH add-on is installed + assert ( + shell_json("ha addons info core_ssh --no-progress --raw-json").get("data", {}).get("version") is not None + ), "Core SSH add-on not installed" + # start Core SSH add-on + assert ( + shell_json("ha addons start core_ssh --no-progress --raw-json").get("result") == "ok" + ), "Core SSH add-on start failed" + # check Core SSH add-on is running + ssh_info = shell_json("ha addons info core_ssh --no-progress --raw-json") + assert ssh_info.get("data", {}).get("state") == "started", "Core SSH add-on not running" + logger.info("Core SSH add-on info: %s", ssh_info) + + +@pytest.mark.dependency(depends=["test_supervisor_is_updated"]) +def test_code_sign(shell_json): + # enable Content-Trust + assert ( + shell_json("ha security options --content-trust=true --no-progress --raw-json").get("result") == "ok" + ), "Content-Trust enable failed" + # run Supervisor health check + health_check = shell_json("ha resolution healthcheck --no-progress --raw-json") + assert health_check.get("result") == "ok", "Supervisor health check failed" + logger.info("Supervisor health check result: %s", health_check) + # get resolution center info + resolution_info = shell_json("ha resolution info --no-progress --raw-json") + logger.info("Resolution center info: %s", resolution_info) + # check supervisor is healthy + unhealthy = resolution_info.get("data").get("unhealthy") + assert len(unhealthy) == 0, "Supervisor is unhealthy" + # check for unsupported entries + unsupported = resolution_info.get("data").get("unsupported") + assert len(unsupported) == 0, "Unsupported entries found" + + +@pytest.mark.dependency(depends=["test_supervisor_is_updated"]) +def test_create_backup(shell_json, stash): + result = shell_json("ha backups new --no-progress --raw-json") + assert result.get("result") == "ok", f"Backup creation failed: {result}" + slug = result.get("data", {}).get("slug") + assert slug is not None + stash.update(slug=slug) + logger.info("Backup creation result: %s", result) + + +@pytest.mark.dependency(depends=["test_addon_install"]) +def test_addon_uninstall(shell_json): + result = shell_json("ha addons uninstall core_ssh --no-progress --raw-json") + assert result.get("result") == "ok", f"Core SSH add-on uninstall failed: {result}" + logger.info("Core SSH add-on uninstall result: %s", result) + + +@pytest.mark.dependency(depends=["test_supervisor_is_updated"]) +def test_restart_supervisor(shell, shell_json): + result = shell_json("ha supervisor restart --no-progress --raw-json") + assert result.get("result") == "ok", f"Supervisor restart failed: {result}" + + supervisor_ip = "\n".join( + shell.run_check("docker inspect --format='{{.NetworkSettings.IPAddress}}' hassio_supervisor") + ) + + for _ in range(100): + try: + if shell_json(f"curl -sSL http://{supervisor_ip}/supervisor/ping").get("result") == "ok": + if shell_json("ha os info --no-progress --raw-json").get("result") == "ok": + break + except ExecutionError: + pass # avoid failure when the container is restarting + sleep(5) + else: + raise AssertionError("Supervisor did not start in time") + + +@pytest.mark.dependency(depends=["test_create_backup"]) +def test_restore_backup(shell_json, stash): + result = shell_json( + f"ha backups restore {stash.get('slug')} --addons core_ssh --no-progress --raw-json", + timeout=300, + ) + assert result.get("result") == "ok", f"Backup restore failed: {result}" + logger.info("Backup restore result: %s", result) + + addon_info = shell_json("ha addons info core_ssh --no-progress --raw-json") + assert addon_info.get("data", {}).get("version") is not None, "Core SSH add-on not installed" + assert addon_info.get("data", {}).get("state") == "started", "Core SSH add-on not running" + logger.info("Core SSH add-on info: %s", addon_info) + + +@pytest.mark.dependency(depends=["test_create_backup"]) +def test_restore_ssl_directory(shell_json, stash): + result = shell_json(f"ha backups restore {stash.get('slug')} --folders ssl --raw-json") + assert result.get("result") == "ok", f"Backup restore failed: {result}" + logger.info("Backup restore result: %s", result)