From 825672a450f527d40adb9baf890fb814bf90e9d9 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 28 Oct 2025 22:00:31 -0400 Subject: [PATCH] added tests --- .github/workflows/pytest.yml | 19 ++ tests/__init__.py | 1 + tests/test_common.py | 61 +++++ tests/test_components.py | 95 ++++++++ tests/test_parallelize.py | 124 +++++++++++ tests/test_server_validate_scripts.py | 60 +++++ tests/test_snapshot_manager.py | 167 ++++++++++++++ tests/test_zfs.py | 309 ++++++++++++++++++++++++++ 8 files changed, 836 insertions(+) create mode 100644 .github/workflows/pytest.yml create mode 100644 tests/__init__.py create mode 100644 tests/test_common.py create mode 100644 tests/test_components.py create mode 100644 tests/test_parallelize.py create mode 100644 tests/test_server_validate_scripts.py create mode 100644 tests/test_snapshot_manager.py create mode 100644 tests/test_zfs.py diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..1151780 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,19 @@ +name: pytest + +on: + push: + branches: + - main + pull_request: + branches: + - main + merge_group: + +jobs: + pytest: + runs-on: self-hosted + + steps: + - uses: actions/checkout@v4 + - name: Run tests + run: pytest tests diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d420712 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests.""" diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 0000000..f8689f2 --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,61 @@ +"""test_common.""" + +from __future__ import annotations + +from os import environ +from typing import TYPE_CHECKING + +from apprise import Apprise + +from python.common import bash_wrapper, signal_alert, utcnow + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + +def test_utcnow() -> None: + """test_utcnow.""" + utcnow() + + +def test_signal_alert(mocker: MockerFixture) -> None: + """test_signal_alert.""" + environ["SIGNAL_ALERT_FROM_PHONE"] = "1234567890" + environ["SIGNAL_ALERT_TO_PHONE"] = "0987654321" + + mock_logger = mocker.patch("python.common.logger") + mock_apprise_client = mocker.MagicMock(spec=Apprise) + mocker.patch("python.common.Apprise", return_value=mock_apprise_client) + + signal_alert("test") + + mock_logger.info.assert_not_called() + mock_apprise_client.add.assert_called_once_with("signal://localhost:8989/1234567890/0987654321") + mock_apprise_client.notify.assert_called_once_with(title="", body="test") + + +def test_signal_alert_no_phones(mocker: MockerFixture) -> None: + """test_signal_alert_no_phones.""" + if "SIGNAL_ALERT_FROM_PHONE" in environ: + del environ["SIGNAL_ALERT_FROM_PHONE"] + if "SIGNAL_ALERT_TO_PHONE" in environ: + del environ["SIGNAL_ALERT_TO_PHONE"] + mock_logger = mocker.patch("python.common.logger") + signal_alert("test") + + mock_logger.info.assert_called_once_with("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set") + + +def test_test_bash_wrapper() -> None: + """test_test_bash_wrapper.""" + stdout, returncode = bash_wrapper("echo test") + assert stdout == "test\n" + assert returncode == 0 + + +def test_test_bash_wrapper_error() -> None: + """test_test_bash_wrapper_error.""" + expected_error = 2 + stdout, returncode = bash_wrapper("ls /this/path/does/not/exist") + assert stdout == "ls: cannot access '/this/path/does/not/exist': No such file or directory\n" + assert returncode == expected_error diff --git a/tests/test_components.py b/tests/test_components.py new file mode 100644 index 0000000..0d27270 --- /dev/null +++ b/tests/test_components.py @@ -0,0 +1,95 @@ +"""test_components.""" + +from pytest_mock import MockerFixture + +from python.system_tests.components import systemd_tests, zpool_tests +from python.zfs import Zpool + +temp = "Every feature flags pool has all supported and requested features enabled.\n" + +SYSTEM_TESTS_COMPONENTS = "python.system_tests.components" + + +def test_zpool_tests(mocker: MockerFixture) -> None: + """test_zpool_tests.""" + mock_zpool = mocker.MagicMock(spec=Zpool) + mock_zpool.health = "ONLINE" + mock_zpool.capacity = 70 + mock_zpool.name = "Main" + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool) + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, "")) + errors = zpool_tests(("Main",)) + assert errors == [] + + +def test_zpool_tests_out_of_date(mocker: MockerFixture) -> None: + """test_zpool_tests_out_of_date.""" + mock_zpool = mocker.MagicMock(spec=Zpool) + mock_zpool.health = "ONLINE" + mock_zpool.capacity = 70 + mock_zpool.name = "Main" + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool) + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("", "")) + errors = zpool_tests(("Main",)) + assert errors == ["ZPool out of date run `sudo zpool upgrade -a`"] + + +def test_zpool_tests_out_of_space(mocker: MockerFixture) -> None: + """test_zpool_tests_out_of_space.""" + mock_zpool = mocker.MagicMock(spec=Zpool) + mock_zpool.health = "ONLINE" + mock_zpool.capacity = 100 + mock_zpool.name = "Main" + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool) + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, "")) + errors = zpool_tests(("Main",)) + assert errors == ["Main is low on space"] + + +def test_zpool_tests_offline(mocker: MockerFixture) -> None: + """test_zpool_tests_offline.""" + mock_zpool = mocker.MagicMock(spec=Zpool) + mock_zpool.health = "OFFLINE" + mock_zpool.capacity = 70 + mock_zpool.name = "Main" + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool) + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, "")) + errors = zpool_tests(("Main",)) + assert errors == ["Main is OFFLINE"] + + +def test_systemd_tests() -> None: + """test_systemd_tests.""" + errors = systemd_tests(("docker",)) + assert errors == [] + + +def test_systemd_tests_multiple_negative_retries() -> None: + """test_systemd_tests_fail.""" + errors = systemd_tests(("docker",), max_retries=-1, retry_delay_secs=-1) + assert errors == [] + + +def test_systemd_tests_multiple_pass(mocker: MockerFixture) -> None: + """test_systemd_tests_fail.""" + mocker.patch( + f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", + side_effect=[ + ("inactive\n", ""), + ("activating\n", ""), + ("active\n", ""), + ], + ) + errors = systemd_tests( + ("docker",), + retryable_statuses=("inactive\n", "activating\n"), + valid_statuses=("active\n",), + ) + assert errors == [] + + +def test_systemd_tests_fail(mocker: MockerFixture) -> None: + """test_systemd_tests_fail.""" + mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("inactive\n", "")) + errors = systemd_tests(("docker",), max_retries=5) + assert errors == ["docker is inactive"] diff --git a/tests/test_parallelize.py b/tests/test_parallelize.py new file mode 100644 index 0000000..c00619a --- /dev/null +++ b/tests/test_parallelize.py @@ -0,0 +1,124 @@ +"""test_executors.""" + +from __future__ import annotations + +import logging +from concurrent.futures import Future, ThreadPoolExecutor +from typing import TYPE_CHECKING, Any + +import pytest + +from python.parallelize import _parallelize_base, parallelize_process, parallelize_thread + +if TYPE_CHECKING: + from collections.abc import Callable + + from pytest_mock import MockerFixture + + +class MockFuture(Future): + """MockFuture.""" + + def __init__(self, result: Any) -> None: # noqa: ANN401 + """Init.""" + super().__init__() + self._result = result + self._exception: BaseException | None = None + self.set_result(result) + + def exception(self, timeout: float | None = None) -> BaseException | None: + """Exception.""" + logging.debug(f"{timeout}=") + return self._exception + + def result(self, timeout: float | None = None) -> Any: # noqa: ANN401 + """Result.""" + logging.debug(f"{timeout}=") + return self._result + + +class MockPoolExecutor(ThreadPoolExecutor): + """MockPoolExecutor.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401 + """Initializes a new ThreadPoolExecutor instance.""" + super().__init__(*args, **kwargs) + + def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future: # noqa: ANN401 + """Submits a callable to be executed with the given arguments. + + Args: + fn: The callable to execute. + *args: The positional arguments to pass to the callable. + **kwargs: The keyword arguments to pass to the callable. + + Returns: + A Future instance representing the execution of the callable. + """ + result = fn(*args, **kwargs) + return MockFuture(result) + + +def add(a: int, b: int) -> int: + """Add.""" + return a + b + + +def test_parallelize_thread() -> None: + """test_parallelize_thread.""" + kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + results = parallelize_thread(func=add, kwargs_list=kwargs_list, progress_tracker=1) + assert results.results == [3, 7] + assert not results.exceptions + + +def test_parallelize_thread_exception() -> None: + """test_parallelize_thread.""" + kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}] + results = parallelize_thread(func=add, kwargs_list=kwargs_list) + assert results.results == [3] + output = """[TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'")]""" + assert str(results.exceptions) == output + + +def test_parallelize_process() -> None: + """test_parallelize_process.""" + kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + results = parallelize_process(func=add, kwargs_list=kwargs_list) + assert results.results == [3, 7] + assert not results.exceptions + + +def test_parallelize_process_to_many_max_workers(mocker: MockerFixture) -> None: + """test_parallelize_process.""" + mocker.patch(target="python" + ".parallelize.cpu_count", return_value=1) + + with pytest.raises(RuntimeError, match="max_workers must be less than or equal to 1"): + parallelize_process(func=add, kwargs_list=[{"a": 1, "b": 2}], max_workers=8) + + +def test_executor_results_repr() -> None: + """test_ExecutorResults_repr.""" + results = parallelize_thread(func=add, kwargs_list=[{"a": 1, "b": 2}]) + assert repr(results) == "results=[3] exceptions=[]" + + +def test_early_error() -> None: + """test_early_error.""" + kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}] + with pytest.raises(TypeError, match=r"unsupported operand type\(s\) for \+\: 'int' and 'NoneType'"): + parallelize_thread(func=add, kwargs_list=kwargs_list, mode="early_error") + + +def test_mock_pool_executor() -> None: + """test_mock_pool_executor.""" + results = _parallelize_base( + executor_type=MockPoolExecutor, + func=add, + kwargs_list=[{"a": 1, "b": 2}, {"a": 3, "b": 4}], + max_workers=None, + progress_tracker=None, + mode="normal", + ) + assert repr(results) == "results=[3, 7] exceptions=[]" diff --git a/tests/test_server_validate_scripts.py b/tests/test_server_validate_scripts.py new file mode 100644 index 0000000..7831e27 --- /dev/null +++ b/tests/test_server_validate_scripts.py @@ -0,0 +1,60 @@ +"""test_server_validate_scripts.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest +from pytest_mock import MockerFixture + +from python.system_tests.validate_system import main + +if TYPE_CHECKING: + from pyfakefs.fake_filesystem import FakeFilesystem + from pytest_mock import MockerFixture + +VALIDATE_SYSTEM = "python.system_tests.validate_system" + + +def test_validate_system(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """test_validate_system.""" + fs.create_file( + "/mock_snapshot_config.toml", + contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n', + ) + + mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None) + mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=None) + main(Path("/mock_snapshot_config.toml")) + + +def test_validate_system_errors(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """test_validate_system_errors.""" + fs.create_file( + "/mock_snapshot_config.toml", + contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n', + ) + + mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=["systemd_tests error"]) + mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=["zpool_tests error"]) + + with pytest.raises(SystemExit) as exception_info: + main(Path("/mock_snapshot_config.toml")) + + assert exception_info.value.code == 1 + + +def test_validate_system_execution(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """test_validate_system_execution.""" + fs.create_file( + "/mock_snapshot_config.toml", + contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n', + ) + + mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", side_effect=RuntimeError("zpool_tests error")) + + with pytest.raises(SystemExit) as exception_info: + main(Path("/mock_snapshot_config.toml")) + + assert exception_info.value.code == 1 diff --git a/tests/test_snapshot_manager.py b/tests/test_snapshot_manager.py new file mode 100644 index 0000000..a849829 --- /dev/null +++ b/tests/test_snapshot_manager.py @@ -0,0 +1,167 @@ +"""test_snapshot_manager.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from python.tools.snapshot_manager import get_snapshots_to_delete, get_time_stamp, load_config_data, main +from python.zfs.dataset import Dataset, Snapshot + +if TYPE_CHECKING: + from pyfakefs.fake_filesystem import FakeFilesystem + from pytest_mock import MockerFixture + +SNAPSHOT_MANAGER = "python.tools.snapshot_manager" + + +def patch_utcnow(mocker: MockerFixture, datetime_value: datetime) -> None: + """patch_utcnow.""" + mocker.patch("python.tools.snapshot_manager.utcnow", return_value=datetime_value) + + +def create_mock_snapshot(mocker: MockerFixture, name: str) -> Snapshot: + """create_mock_snapshot.""" + mock_snapshot = mocker.MagicMock(spec=Snapshot) + mock_snapshot.name = name + + return mock_snapshot + + +def test_main(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """Test main.""" + load_config_data.cache_clear() + + mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00") + + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.create_snapshot.return_value = "snapshot created" + mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,)) + + mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete") + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n' + fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml) + main(Path("/mock_snapshot_config.toml")) + + mock_signal_alert.assert_not_called() + mock_get_datasets.assert_called_once() + mock_get_snapshots_to_delete.assert_called_once_with( + mock_dataset, + { + "15_min": 8, + "hourly": 24, + "daily": 0, + "monthly": 0, + }, + ) + + +def test_main_create_snapshot_failure(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """Test main.""" + load_config_data.cache_clear() + + mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00") + + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.create_snapshot.return_value = "snapshot not created" + mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,)) + + mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete") + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n' + fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml) + main(Path("/mock_snapshot_config.toml")) + + mock_signal_alert.assert_called_once_with("test_dataset failed to create snapshot 2023-01-01T00:00:00") + mock_get_datasets.assert_called_once() + mock_get_snapshots_to_delete.assert_not_called() + + +def test_main_exception(mocker: MockerFixture, fs: FakeFilesystem) -> None: + """Test main.""" + load_config_data.cache_clear() + + mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00") + + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.create_snapshot.return_value = "snapshot created" + mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", side_effect=Exception("test")) + + mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete") + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n' + fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml) + with pytest.raises(SystemExit) as pytest_wrapped_e: + main(Path("/mock_snapshot_config.toml")) + + assert isinstance(pytest_wrapped_e.value, SystemExit) + assert pytest_wrapped_e.value.code == 1 + mock_signal_alert.assert_called_once_with("snapshot_manager failed") + mock_get_datasets.assert_called_once() + mock_get_snapshots_to_delete.assert_not_called() + + +def test_get_snapshots_to_delete(mocker: MockerFixture) -> None: + """test_get_snapshots_to_delete.""" + mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415") + mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415") + + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1) + mock_dataset.delete_snapshot.return_value = None + + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + + get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0}) + + mock_signal_alert.assert_not_called() + mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415") + + +def test_get_snapshots_to_delete_no_snapshot(mocker: MockerFixture) -> None: + """test_get_snapshots_to_delete_no_snapshot.""" + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.get_snapshots.return_value = () + mock_dataset.delete_snapshot.return_value = None + + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + + get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0}) + + mock_signal_alert.assert_not_called() + mock_dataset.delete_snapshot.assert_not_called() + + +def test_get_snapshots_to_delete_errored(mocker: MockerFixture) -> None: + """test_get_snapshots_to_delete_errored.""" + mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415") + mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415") + + mock_dataset = mocker.MagicMock(spec=Dataset) + mock_dataset.name = "test_dataset" + mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1) + mock_dataset.delete_snapshot.return_value = "snapshot has dependent clones" + + mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert") + + get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0}) + + mock_signal_alert.assert_called_once_with( + "test_dataset@auto_202509150415 failed to delete: snapshot has dependent clones" + ) + mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415") + + +def test_get_time_stamp(mocker: MockerFixture) -> None: + """Test get_time_stamp.""" + patch_utcnow(mocker, datetime(2023, 1, 1, 0, 0, 0, tzinfo=UTC)) + assert get_time_stamp() == "auto_202301010000" diff --git a/tests/test_zfs.py b/tests/test_zfs.py new file mode 100644 index 0000000..6c6621a --- /dev/null +++ b/tests/test_zfs.py @@ -0,0 +1,309 @@ +"""Test zfs.""" + +import json +from datetime import UTC, datetime +from unittest.mock import call + +import pytest +from pytest_mock import MockerFixture + +from python.zfs import Dataset, Snapshot, Zpool, get_datasets +from python.zfs.dataset import _zfs_list +from python.zfs.zpool import _zpool_list + +DATASET = "python.zfs.dataset" +ZPOOL = "python.zfs.zpool" +SAMPLE_SNAPSHOT_DATA = { + "createtxg": "123", + "properties": { + "creation": {"value": "1620000000"}, + "defer_destroy": {"value": "off"}, + "guid": {"value": "456"}, + "objsetid": {"value": "789"}, + "referenced": {"value": "1024"}, + "used": {"value": "512"}, + "userrefs": {"value": "0"}, + "version": {"value": "1"}, + "written": {"value": "2048"}, + }, + "name": "pool/dataset@snap1", +} + +SAMPLE_DATASET_DATA = { + "output_version": {"vers_major": 0, "vers_minor": 1, "command": "zfs list"}, + "datasets": { + "pool/dataset": { + "properties": { + "aclinherit": {"value": "restricted"}, + "aclmode": {"value": "discard"}, + "acltype": {"value": "off"}, + "available": {"value": "1000000"}, + "canmount": {"value": "on"}, + "checksum": {"value": "on"}, + "clones": {"value": ""}, + "compression": {"value": "lz4"}, + "copies": {"value": "1"}, + "createtxg": {"value": "1234"}, + "creation": {"value": "1620000000"}, + "dedup": {"value": "off"}, + "devices": {"value": "on"}, + "encryption": {"value": "off"}, + "exec": {"value": "on"}, + "filesystem_limit": {"value": "none"}, + "guid": {"value": "5678"}, + "keystatus": {"value": "none"}, + "logbias": {"value": "latency"}, + "mlslabel": {"value": "none"}, + "mounted": {"value": "yes"}, + "mountpoint": {"value": "/pool/dataset"}, + "quota": {"value": "0"}, + "readonly": {"value": "off"}, + "recordsize": {"value": "131072"}, + "redundant_metadata": {"value": "all"}, + "referenced": {"value": "512000"}, + "refquota": {"value": "0"}, + "refreservation": {"value": "0"}, + "reservation": {"value": "0"}, + "setuid": {"value": "on"}, + "sharenfs": {"value": "off"}, + "snapdir": {"value": "hidden"}, + "snapshot_limit": {"value": "none"}, + "sync": {"value": "standard"}, + "used": {"value": "1024000"}, + "usedbychildren": {"value": "512000"}, + "usedbydataset": {"value": "256000"}, + "usedbysnapshots": {"value": "256000"}, + "version": {"value": "5"}, + "volmode": {"value": "default"}, + "volsize": {"value": "none"}, + "vscan": {"value": "off"}, + "written": {"value": "4096"}, + "xattr": {"value": "on"}, + } + } + }, +} + +SAMPLE_ZPOOL_DATA = { + "output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"}, + "pools": { + "testpool": { + "properties": { + "allocated": {"value": "1000000"}, + "altroot": {"value": "none"}, + "ashift": {"value": "12"}, + "autoexpand": {"value": "off"}, + "autoreplace": {"value": "off"}, + "autotrim": {"value": "off"}, + "capacity": {"value": "50"}, + "comment": {"value": "test pool"}, + "dedupratio": {"value": "1.00x"}, + "delegation": {"value": "on"}, + "expandsize": {"value": "0"}, + "failmode": {"value": "wait"}, + "fragmentation": {"value": "20"}, + "free": {"value": "1000000"}, + "freeing": {"value": "0"}, + "guid": {"value": "12345678"}, + "health": {"value": "ONLINE"}, + "leaked": {"value": "0"}, + "readonly": {"value": "off"}, + "size": {"value": "2000000"}, + } + } + }, +} + + +def test_dataset_initialization(mocker: MockerFixture) -> None: + """Test Dataset class initialization with mocked ZFS data.""" + mocker.patch(f"{DATASET}._zfs_list", return_value=SAMPLE_DATASET_DATA) + + dataset = Dataset("pool/dataset") + + assert dataset.__dict__ == { + "aclinherit": "restricted", + "aclmode": "discard", + "acltype": "off", + "available": 1000000, + "canmount": "on", + "checksum": "on", + "clones": "", + "compression": "lz4", + "copies": 1, + "createtxg": 1234, + "creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC), + "dedup": "off", + "devices": "on", + "encryption": "off", + "exec": "on", + "filesystem_limit": "none", + "guid": 5678, + "keystatus": "none", + "logbias": "latency", + "mlslabel": "none", + "mounted": "yes", + "mountpoint": "/pool/dataset", + "name": "pool/dataset", + "quota": 0, + "readonly": "off", + "recordsize": 131072, + "redundant_metadata": "all", + "referenced": 512000, + "refquota": 0, + "refreservation": 0, + "reservation": 0, + "setuid": "on", + "sharenfs": "off", + "snapdir": "hidden", + "snapshot_limit": "none", + "sync": "standard", + "used": 1024000, + "usedbychildren": 512000, + "usedbydataset": 256000, + "usedbysnapshots": 256000, + "version": 5, + "volmode": "default", + "volsize": "none", + "vscan": "off", + "written": 4096, + "xattr": "on", + } + + +def test_snapshot_initialization() -> None: + """Test Snapshot class initialization with mocked ZFS data.""" + snapshot = Snapshot(SAMPLE_SNAPSHOT_DATA) + assert snapshot.__dict__ == { + "createtxg": 123, + "creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC), + "defer_destroy": "off", + "guid": 456, + "name": "snap1", + "objsetid": 789, + "referenced": 1024, + "used": 512, + "userrefs": 0, + "version": 1, + "written": 2048, + } + + +def test_zfs_list_version_check(mocker: MockerFixture) -> None: + """Test version validation in _zfs_list.""" + mocker.patch( + f"{DATASET}.bash_wrapper", + return_value=( + json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zfs list"}}), + 0, + ), + ) + + with pytest.raises(RuntimeError) as excinfo: + _zfs_list("zfs list invalid -pHj -o all") + + assert "Datasets are not in the correct format" in str(excinfo.value) + + +def test_get_datasets(mocker: MockerFixture) -> None: + """Test get_datasets.""" + mock_bash = mocker.patch(f"{DATASET}.bash_wrapper", return_value=("pool/dataset\npool/other\ninvalid", 0)) + mock_dataset = mocker.patch(f"{DATASET}.Dataset") + + get_datasets() + + mock_bash.assert_called_once_with("zfs list -Hp -t filesystem -o name") + + calls = [call("pool/dataset"), call("pool/other")] + + mock_dataset.assert_has_calls(calls) + + +def test_zpool_initialization(mocker: MockerFixture) -> None: + """Test Zpool class initialization with mocked ZFS data.""" + mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA) + + zpool = Zpool("testpool") + + assert zpool.__dict__ == { + "name": "testpool", + "allocated": 1000000, + "altroot": "none", + "ashift": 12, + "autoexpand": "off", + "autoreplace": "off", + "autotrim": "off", + "capacity": 50, + "comment": "test pool", + "dedupratio": "1.00x", + "delegation": "on", + "expandsize": "0", + "failmode": "wait", + "fragmentation": 20, + "free": "1000000", + "freeing": 0, + "guid": 12345678, + "health": "ONLINE", + "leaked": 0, + "readonly": "off", + "size": 2000000, + } + + +def test_zpool_repr(mocker: MockerFixture) -> None: + """Test Zpool string representation.""" + mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA) + + zpool = Zpool("testpool") + repr_string = repr(zpool) + + expected_attrs = [ + "name", + "allocated", + "altroot", + "ashift", + "autoexpand", + "autoreplace", + "autotrim", + "capacity", + "comment", + "dedupratio", + "delegation", + "expandsize", + "failmode", + "fragmentation", + "freeing", + "guid", + "health", + "leaked", + "readonly", + "size", + ] + + for attr in expected_attrs: + assert f"{attr}=" in repr_string + + +def test_zpool_list(mocker: MockerFixture) -> None: + """Test version validation in _zpool_list.""" + mocker.patch( + f"{ZPOOL}.bash_wrapper", + return_value=(json.dumps({"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"}}), 0), + ) + + result = _zpool_list("zpool list invalid -pHj -o all") + + assert result == {"output_version": {"command": "zpool list", "vers_major": 0, "vers_minor": 1}} + + +def test_zpool_list_version_check(mocker: MockerFixture) -> None: + """Test version validation in _zpool_list.""" + mocker.patch( + f"{ZPOOL}.bash_wrapper", + return_value=(json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zpool list"}}), 0), + ) + + with pytest.raises(RuntimeError) as excinfo: + _zpool_list("zpool list invalid -pHj -o all") + + assert "Datasets are not in the correct format" in str(excinfo.value)