added tests

This commit is contained in:
2025-10-28 22:00:31 -04:00
parent d2db0de371
commit 825672a450
8 changed files with 836 additions and 0 deletions

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests."""

61
tests/test_common.py Normal file
View File

@@ -0,0 +1,61 @@
"""test_common."""
from __future__ import annotations
from os import environ
from typing import TYPE_CHECKING
from apprise import Apprise
from python.common import bash_wrapper, signal_alert, utcnow
if TYPE_CHECKING:
from pytest_mock import MockerFixture
def test_utcnow() -> None:
"""test_utcnow."""
utcnow()
def test_signal_alert(mocker: MockerFixture) -> None:
"""test_signal_alert."""
environ["SIGNAL_ALERT_FROM_PHONE"] = "1234567890"
environ["SIGNAL_ALERT_TO_PHONE"] = "0987654321"
mock_logger = mocker.patch("python.common.logger")
mock_apprise_client = mocker.MagicMock(spec=Apprise)
mocker.patch("python.common.Apprise", return_value=mock_apprise_client)
signal_alert("test")
mock_logger.info.assert_not_called()
mock_apprise_client.add.assert_called_once_with("signal://localhost:8989/1234567890/0987654321")
mock_apprise_client.notify.assert_called_once_with(title="", body="test")
def test_signal_alert_no_phones(mocker: MockerFixture) -> None:
"""test_signal_alert_no_phones."""
if "SIGNAL_ALERT_FROM_PHONE" in environ:
del environ["SIGNAL_ALERT_FROM_PHONE"]
if "SIGNAL_ALERT_TO_PHONE" in environ:
del environ["SIGNAL_ALERT_TO_PHONE"]
mock_logger = mocker.patch("python.common.logger")
signal_alert("test")
mock_logger.info.assert_called_once_with("SIGNAL_ALERT_FROM_PHONE or SIGNAL_ALERT_TO_PHONE not set")
def test_test_bash_wrapper() -> None:
"""test_test_bash_wrapper."""
stdout, returncode = bash_wrapper("echo test")
assert stdout == "test\n"
assert returncode == 0
def test_test_bash_wrapper_error() -> None:
"""test_test_bash_wrapper_error."""
expected_error = 2
stdout, returncode = bash_wrapper("ls /this/path/does/not/exist")
assert stdout == "ls: cannot access '/this/path/does/not/exist': No such file or directory\n"
assert returncode == expected_error

95
tests/test_components.py Normal file
View File

@@ -0,0 +1,95 @@
"""test_components."""
from pytest_mock import MockerFixture
from python.system_tests.components import systemd_tests, zpool_tests
from python.zfs import Zpool
temp = "Every feature flags pool has all supported and requested features enabled.\n"
SYSTEM_TESTS_COMPONENTS = "python.system_tests.components"
def test_zpool_tests(mocker: MockerFixture) -> None:
"""test_zpool_tests."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == []
def test_zpool_tests_out_of_date(mocker: MockerFixture) -> None:
"""test_zpool_tests_out_of_date."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("", ""))
errors = zpool_tests(("Main",))
assert errors == ["ZPool out of date run `sudo zpool upgrade -a`"]
def test_zpool_tests_out_of_space(mocker: MockerFixture) -> None:
"""test_zpool_tests_out_of_space."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "ONLINE"
mock_zpool.capacity = 100
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == ["Main is low on space"]
def test_zpool_tests_offline(mocker: MockerFixture) -> None:
"""test_zpool_tests_offline."""
mock_zpool = mocker.MagicMock(spec=Zpool)
mock_zpool.health = "OFFLINE"
mock_zpool.capacity = 70
mock_zpool.name = "Main"
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.Zpool", return_value=mock_zpool)
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=(temp, ""))
errors = zpool_tests(("Main",))
assert errors == ["Main is OFFLINE"]
def test_systemd_tests() -> None:
"""test_systemd_tests."""
errors = systemd_tests(("docker",))
assert errors == []
def test_systemd_tests_multiple_negative_retries() -> None:
"""test_systemd_tests_fail."""
errors = systemd_tests(("docker",), max_retries=-1, retry_delay_secs=-1)
assert errors == []
def test_systemd_tests_multiple_pass(mocker: MockerFixture) -> None:
"""test_systemd_tests_fail."""
mocker.patch(
f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper",
side_effect=[
("inactive\n", ""),
("activating\n", ""),
("active\n", ""),
],
)
errors = systemd_tests(
("docker",),
retryable_statuses=("inactive\n", "activating\n"),
valid_statuses=("active\n",),
)
assert errors == []
def test_systemd_tests_fail(mocker: MockerFixture) -> None:
"""test_systemd_tests_fail."""
mocker.patch(f"{SYSTEM_TESTS_COMPONENTS}.bash_wrapper", return_value=("inactive\n", ""))
errors = systemd_tests(("docker",), max_retries=5)
assert errors == ["docker is inactive"]

124
tests/test_parallelize.py Normal file
View File

@@ -0,0 +1,124 @@
"""test_executors."""
from __future__ import annotations
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING, Any
import pytest
from python.parallelize import _parallelize_base, parallelize_process, parallelize_thread
if TYPE_CHECKING:
from collections.abc import Callable
from pytest_mock import MockerFixture
class MockFuture(Future):
"""MockFuture."""
def __init__(self, result: Any) -> None: # noqa: ANN401
"""Init."""
super().__init__()
self._result = result
self._exception: BaseException | None = None
self.set_result(result)
def exception(self, timeout: float | None = None) -> BaseException | None:
"""Exception."""
logging.debug(f"{timeout}=")
return self._exception
def result(self, timeout: float | None = None) -> Any: # noqa: ANN401
"""Result."""
logging.debug(f"{timeout}=")
return self._result
class MockPoolExecutor(ThreadPoolExecutor):
"""MockPoolExecutor."""
def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401
"""Initializes a new ThreadPoolExecutor instance."""
super().__init__(*args, **kwargs)
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future: # noqa: ANN401
"""Submits a callable to be executed with the given arguments.
Args:
fn: The callable to execute.
*args: The positional arguments to pass to the callable.
**kwargs: The keyword arguments to pass to the callable.
Returns:
A Future instance representing the execution of the callable.
"""
result = fn(*args, **kwargs)
return MockFuture(result)
def add(a: int, b: int) -> int:
"""Add."""
return a + b
def test_parallelize_thread() -> None:
"""test_parallelize_thread."""
kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
results = parallelize_thread(func=add, kwargs_list=kwargs_list, progress_tracker=1)
assert results.results == [3, 7]
assert not results.exceptions
def test_parallelize_thread_exception() -> None:
"""test_parallelize_thread."""
kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}]
results = parallelize_thread(func=add, kwargs_list=kwargs_list)
assert results.results == [3]
output = """[TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'")]"""
assert str(results.exceptions) == output
def test_parallelize_process() -> None:
"""test_parallelize_process."""
kwargs_list = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
results = parallelize_process(func=add, kwargs_list=kwargs_list)
assert results.results == [3, 7]
assert not results.exceptions
def test_parallelize_process_to_many_max_workers(mocker: MockerFixture) -> None:
"""test_parallelize_process."""
mocker.patch(target="python"
".parallelize.cpu_count", return_value=1)
with pytest.raises(RuntimeError, match="max_workers must be less than or equal to 1"):
parallelize_process(func=add, kwargs_list=[{"a": 1, "b": 2}], max_workers=8)
def test_executor_results_repr() -> None:
"""test_ExecutorResults_repr."""
results = parallelize_thread(func=add, kwargs_list=[{"a": 1, "b": 2}])
assert repr(results) == "results=[3] exceptions=[]"
def test_early_error() -> None:
"""test_early_error."""
kwargs_list: list[dict[str, int | None]] = [{"a": 1, "b": 2}, {"a": 3, "b": None}]
with pytest.raises(TypeError, match=r"unsupported operand type\(s\) for \+\: 'int' and 'NoneType'"):
parallelize_thread(func=add, kwargs_list=kwargs_list, mode="early_error")
def test_mock_pool_executor() -> None:
"""test_mock_pool_executor."""
results = _parallelize_base(
executor_type=MockPoolExecutor,
func=add,
kwargs_list=[{"a": 1, "b": 2}, {"a": 3, "b": 4}],
max_workers=None,
progress_tracker=None,
mode="normal",
)
assert repr(results) == "results=[3, 7] exceptions=[]"

View File

@@ -0,0 +1,60 @@
"""test_server_validate_scripts."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from pytest_mock import MockerFixture
from python.system_tests.validate_system import main
if TYPE_CHECKING:
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
VALIDATE_SYSTEM = "python.system_tests.validate_system"
def test_validate_system(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=None)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=None)
main(Path("/mock_snapshot_config.toml"))
def test_validate_system_errors(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system_errors."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.systemd_tests", return_value=["systemd_tests error"])
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", return_value=["zpool_tests error"])
with pytest.raises(SystemExit) as exception_info:
main(Path("/mock_snapshot_config.toml"))
assert exception_info.value.code == 1
def test_validate_system_execution(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""test_validate_system_execution."""
fs.create_file(
"/mock_snapshot_config.toml",
contents='zpool = ["root_pool", "storage", "media"]\nservices = ["docker"]\n',
)
mocker.patch(f"{VALIDATE_SYSTEM}.zpool_tests", side_effect=RuntimeError("zpool_tests error"))
with pytest.raises(SystemExit) as exception_info:
main(Path("/mock_snapshot_config.toml"))
assert exception_info.value.code == 1

View File

@@ -0,0 +1,167 @@
"""test_snapshot_manager."""
from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from python.tools.snapshot_manager import get_snapshots_to_delete, get_time_stamp, load_config_data, main
from python.zfs.dataset import Dataset, Snapshot
if TYPE_CHECKING:
from pyfakefs.fake_filesystem import FakeFilesystem
from pytest_mock import MockerFixture
SNAPSHOT_MANAGER = "python.tools.snapshot_manager"
def patch_utcnow(mocker: MockerFixture, datetime_value: datetime) -> None:
"""patch_utcnow."""
mocker.patch("python.tools.snapshot_manager.utcnow", return_value=datetime_value)
def create_mock_snapshot(mocker: MockerFixture, name: str) -> Snapshot:
"""create_mock_snapshot."""
mock_snapshot = mocker.MagicMock(spec=Snapshot)
mock_snapshot.name = name
return mock_snapshot
def test_main(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
main(Path("/mock_snapshot_config.toml"))
mock_signal_alert.assert_not_called()
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_called_once_with(
mock_dataset,
{
"15_min": 8,
"hourly": 24,
"daily": 0,
"monthly": 0,
},
)
def test_main_create_snapshot_failure(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot not created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", return_value=(mock_dataset,))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
main(Path("/mock_snapshot_config.toml"))
mock_signal_alert.assert_called_once_with("test_dataset failed to create snapshot 2023-01-01T00:00:00")
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_not_called()
def test_main_exception(mocker: MockerFixture, fs: FakeFilesystem) -> None:
"""Test main."""
load_config_data.cache_clear()
mocker.patch(f"{SNAPSHOT_MANAGER}.get_time_stamp", return_value="2023-01-01T00:00:00")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.create_snapshot.return_value = "snapshot created"
mock_get_datasets = mocker.patch(f"{SNAPSHOT_MANAGER}.get_datasets", side_effect=Exception("test"))
mock_get_snapshots_to_delete = mocker.patch(f"{SNAPSHOT_MANAGER}.get_snapshots_to_delete")
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
mock_snapshot_config_toml = '["default"]\n15_min = 8\nhourly = 24\ndaily = 0\nmonthly = 0\n'
fs.create_file("/mock_snapshot_config.toml", contents=mock_snapshot_config_toml)
with pytest.raises(SystemExit) as pytest_wrapped_e:
main(Path("/mock_snapshot_config.toml"))
assert isinstance(pytest_wrapped_e.value, SystemExit)
assert pytest_wrapped_e.value.code == 1
mock_signal_alert.assert_called_once_with("snapshot_manager failed")
mock_get_datasets.assert_called_once()
mock_get_snapshots_to_delete.assert_not_called()
def test_get_snapshots_to_delete(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete."""
mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415")
mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1)
mock_dataset.delete_snapshot.return_value = None
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_not_called()
mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415")
def test_get_snapshots_to_delete_no_snapshot(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete_no_snapshot."""
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = ()
mock_dataset.delete_snapshot.return_value = None
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_not_called()
mock_dataset.delete_snapshot.assert_not_called()
def test_get_snapshots_to_delete_errored(mocker: MockerFixture) -> None:
"""test_get_snapshots_to_delete_errored."""
mock_snapshot_0 = create_mock_snapshot(mocker, "auto_202509150415")
mock_snapshot_1 = create_mock_snapshot(mocker, "auto_202509150415")
mock_dataset = mocker.MagicMock(spec=Dataset)
mock_dataset.name = "test_dataset"
mock_dataset.get_snapshots.return_value = (mock_snapshot_0, mock_snapshot_1)
mock_dataset.delete_snapshot.return_value = "snapshot has dependent clones"
mock_signal_alert = mocker.patch(f"{SNAPSHOT_MANAGER}.signal_alert")
get_snapshots_to_delete(mock_dataset, {"15_min": 1, "hourly": 0, "daily": 0, "monthly": 0})
mock_signal_alert.assert_called_once_with(
"test_dataset@auto_202509150415 failed to delete: snapshot has dependent clones"
)
mock_dataset.delete_snapshot.assert_called_once_with("auto_202509150415")
def test_get_time_stamp(mocker: MockerFixture) -> None:
"""Test get_time_stamp."""
patch_utcnow(mocker, datetime(2023, 1, 1, 0, 0, 0, tzinfo=UTC))
assert get_time_stamp() == "auto_202301010000"

309
tests/test_zfs.py Normal file
View File

@@ -0,0 +1,309 @@
"""Test zfs."""
import json
from datetime import UTC, datetime
from unittest.mock import call
import pytest
from pytest_mock import MockerFixture
from python.zfs import Dataset, Snapshot, Zpool, get_datasets
from python.zfs.dataset import _zfs_list
from python.zfs.zpool import _zpool_list
DATASET = "python.zfs.dataset"
ZPOOL = "python.zfs.zpool"
SAMPLE_SNAPSHOT_DATA = {
"createtxg": "123",
"properties": {
"creation": {"value": "1620000000"},
"defer_destroy": {"value": "off"},
"guid": {"value": "456"},
"objsetid": {"value": "789"},
"referenced": {"value": "1024"},
"used": {"value": "512"},
"userrefs": {"value": "0"},
"version": {"value": "1"},
"written": {"value": "2048"},
},
"name": "pool/dataset@snap1",
}
SAMPLE_DATASET_DATA = {
"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zfs list"},
"datasets": {
"pool/dataset": {
"properties": {
"aclinherit": {"value": "restricted"},
"aclmode": {"value": "discard"},
"acltype": {"value": "off"},
"available": {"value": "1000000"},
"canmount": {"value": "on"},
"checksum": {"value": "on"},
"clones": {"value": ""},
"compression": {"value": "lz4"},
"copies": {"value": "1"},
"createtxg": {"value": "1234"},
"creation": {"value": "1620000000"},
"dedup": {"value": "off"},
"devices": {"value": "on"},
"encryption": {"value": "off"},
"exec": {"value": "on"},
"filesystem_limit": {"value": "none"},
"guid": {"value": "5678"},
"keystatus": {"value": "none"},
"logbias": {"value": "latency"},
"mlslabel": {"value": "none"},
"mounted": {"value": "yes"},
"mountpoint": {"value": "/pool/dataset"},
"quota": {"value": "0"},
"readonly": {"value": "off"},
"recordsize": {"value": "131072"},
"redundant_metadata": {"value": "all"},
"referenced": {"value": "512000"},
"refquota": {"value": "0"},
"refreservation": {"value": "0"},
"reservation": {"value": "0"},
"setuid": {"value": "on"},
"sharenfs": {"value": "off"},
"snapdir": {"value": "hidden"},
"snapshot_limit": {"value": "none"},
"sync": {"value": "standard"},
"used": {"value": "1024000"},
"usedbychildren": {"value": "512000"},
"usedbydataset": {"value": "256000"},
"usedbysnapshots": {"value": "256000"},
"version": {"value": "5"},
"volmode": {"value": "default"},
"volsize": {"value": "none"},
"vscan": {"value": "off"},
"written": {"value": "4096"},
"xattr": {"value": "on"},
}
}
},
}
SAMPLE_ZPOOL_DATA = {
"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"},
"pools": {
"testpool": {
"properties": {
"allocated": {"value": "1000000"},
"altroot": {"value": "none"},
"ashift": {"value": "12"},
"autoexpand": {"value": "off"},
"autoreplace": {"value": "off"},
"autotrim": {"value": "off"},
"capacity": {"value": "50"},
"comment": {"value": "test pool"},
"dedupratio": {"value": "1.00x"},
"delegation": {"value": "on"},
"expandsize": {"value": "0"},
"failmode": {"value": "wait"},
"fragmentation": {"value": "20"},
"free": {"value": "1000000"},
"freeing": {"value": "0"},
"guid": {"value": "12345678"},
"health": {"value": "ONLINE"},
"leaked": {"value": "0"},
"readonly": {"value": "off"},
"size": {"value": "2000000"},
}
}
},
}
def test_dataset_initialization(mocker: MockerFixture) -> None:
"""Test Dataset class initialization with mocked ZFS data."""
mocker.patch(f"{DATASET}._zfs_list", return_value=SAMPLE_DATASET_DATA)
dataset = Dataset("pool/dataset")
assert dataset.__dict__ == {
"aclinherit": "restricted",
"aclmode": "discard",
"acltype": "off",
"available": 1000000,
"canmount": "on",
"checksum": "on",
"clones": "",
"compression": "lz4",
"copies": 1,
"createtxg": 1234,
"creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC),
"dedup": "off",
"devices": "on",
"encryption": "off",
"exec": "on",
"filesystem_limit": "none",
"guid": 5678,
"keystatus": "none",
"logbias": "latency",
"mlslabel": "none",
"mounted": "yes",
"mountpoint": "/pool/dataset",
"name": "pool/dataset",
"quota": 0,
"readonly": "off",
"recordsize": 131072,
"redundant_metadata": "all",
"referenced": 512000,
"refquota": 0,
"refreservation": 0,
"reservation": 0,
"setuid": "on",
"sharenfs": "off",
"snapdir": "hidden",
"snapshot_limit": "none",
"sync": "standard",
"used": 1024000,
"usedbychildren": 512000,
"usedbydataset": 256000,
"usedbysnapshots": 256000,
"version": 5,
"volmode": "default",
"volsize": "none",
"vscan": "off",
"written": 4096,
"xattr": "on",
}
def test_snapshot_initialization() -> None:
"""Test Snapshot class initialization with mocked ZFS data."""
snapshot = Snapshot(SAMPLE_SNAPSHOT_DATA)
assert snapshot.__dict__ == {
"createtxg": 123,
"creation": datetime(2021, 5, 3, 0, 0, tzinfo=UTC),
"defer_destroy": "off",
"guid": 456,
"name": "snap1",
"objsetid": 789,
"referenced": 1024,
"used": 512,
"userrefs": 0,
"version": 1,
"written": 2048,
}
def test_zfs_list_version_check(mocker: MockerFixture) -> None:
"""Test version validation in _zfs_list."""
mocker.patch(
f"{DATASET}.bash_wrapper",
return_value=(
json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zfs list"}}),
0,
),
)
with pytest.raises(RuntimeError) as excinfo:
_zfs_list("zfs list invalid -pHj -o all")
assert "Datasets are not in the correct format" in str(excinfo.value)
def test_get_datasets(mocker: MockerFixture) -> None:
"""Test get_datasets."""
mock_bash = mocker.patch(f"{DATASET}.bash_wrapper", return_value=("pool/dataset\npool/other\ninvalid", 0))
mock_dataset = mocker.patch(f"{DATASET}.Dataset")
get_datasets()
mock_bash.assert_called_once_with("zfs list -Hp -t filesystem -o name")
calls = [call("pool/dataset"), call("pool/other")]
mock_dataset.assert_has_calls(calls)
def test_zpool_initialization(mocker: MockerFixture) -> None:
"""Test Zpool class initialization with mocked ZFS data."""
mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA)
zpool = Zpool("testpool")
assert zpool.__dict__ == {
"name": "testpool",
"allocated": 1000000,
"altroot": "none",
"ashift": 12,
"autoexpand": "off",
"autoreplace": "off",
"autotrim": "off",
"capacity": 50,
"comment": "test pool",
"dedupratio": "1.00x",
"delegation": "on",
"expandsize": "0",
"failmode": "wait",
"fragmentation": 20,
"free": "1000000",
"freeing": 0,
"guid": 12345678,
"health": "ONLINE",
"leaked": 0,
"readonly": "off",
"size": 2000000,
}
def test_zpool_repr(mocker: MockerFixture) -> None:
"""Test Zpool string representation."""
mocker.patch(f"{ZPOOL}._zpool_list", return_value=SAMPLE_ZPOOL_DATA)
zpool = Zpool("testpool")
repr_string = repr(zpool)
expected_attrs = [
"name",
"allocated",
"altroot",
"ashift",
"autoexpand",
"autoreplace",
"autotrim",
"capacity",
"comment",
"dedupratio",
"delegation",
"expandsize",
"failmode",
"fragmentation",
"freeing",
"guid",
"health",
"leaked",
"readonly",
"size",
]
for attr in expected_attrs:
assert f"{attr}=" in repr_string
def test_zpool_list(mocker: MockerFixture) -> None:
"""Test version validation in _zpool_list."""
mocker.patch(
f"{ZPOOL}.bash_wrapper",
return_value=(json.dumps({"output_version": {"vers_major": 0, "vers_minor": 1, "command": "zpool list"}}), 0),
)
result = _zpool_list("zpool list invalid -pHj -o all")
assert result == {"output_version": {"command": "zpool list", "vers_major": 0, "vers_minor": 1}}
def test_zpool_list_version_check(mocker: MockerFixture) -> None:
"""Test version validation in _zpool_list."""
mocker.patch(
f"{ZPOOL}.bash_wrapper",
return_value=(json.dumps({"output_version": {"vers_major": 1, "vers_minor": 0, "command": "zpool list"}}), 0),
)
with pytest.raises(RuntimeError) as excinfo:
_zpool_list("zpool list invalid -pHj -o all")
assert "Datasets are not in the correct format" in str(excinfo.value)