From c1ce7e0ac4219e0052810da23e829db7657169e6 Mon Sep 17 00:00:00 2001 From: Richie Cahill Date: Tue, 28 Oct 2025 19:30:42 -0400 Subject: [PATCH] added zfs --- python/zfs/__init__.py | 11 +++ python/zfs/dataset.py | 212 +++++++++++++++++++++++++++++++++++++++++ python/zfs/zpool.py | 86 +++++++++++++++++ 3 files changed, 309 insertions(+) create mode 100644 python/zfs/__init__.py create mode 100644 python/zfs/dataset.py create mode 100644 python/zfs/zpool.py diff --git a/python/zfs/__init__.py b/python/zfs/__init__.py new file mode 100644 index 0000000..74d7d0b --- /dev/null +++ b/python/zfs/__init__.py @@ -0,0 +1,11 @@ +"""init.""" + +from python.zfs.dataset import Dataset, Snapshot, get_datasets +from python.zfs.zpool import Zpool + +__all__ = [ + "Dataset", + "Snapshot", + "Zpool", + "get_datasets", +] diff --git a/python/zfs/dataset.py b/python/zfs/dataset.py new file mode 100644 index 0000000..ec58bbb --- /dev/null +++ b/python/zfs/dataset.py @@ -0,0 +1,212 @@ +"""dataset.""" + +from __future__ import annotations + +import json +import logging +from datetime import UTC, datetime +from typing import Any + +from python.common import bash_wrapper + + +def _zfs_list(zfs_list: str) -> dict[str, Any]: + """Check the version of zfs.""" + raw_zfs_list_data, _ = bash_wrapper(zfs_list) + + zfs_list_data = json.loads(raw_zfs_list_data) + + vers_major = zfs_list_data["output_version"]["vers_major"] + vers_minor = zfs_list_data["output_version"]["vers_minor"] + command = zfs_list_data["output_version"]["command"] + + if vers_major != 0 or vers_minor != 1 or command != "zfs list": + error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}" + raise RuntimeError(error) + + return zfs_list_data + + +class Snapshot: + """Snapshot.""" + + def __init__(self, snapshot_data: dict[str, Any]) -> None: + """__init__.""" + properties = snapshot_data["properties"] + self.createtxg = int(snapshot_data["createtxg"]) + self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC) + self.defer_destroy = properties["defer_destroy"]["value"] + self.guid = int(properties["guid"]["value"]) + self.name = snapshot_data["name"].split("@")[1] + self.objsetid = int(properties["objsetid"]["value"]) + self.referenced = int(properties["referenced"]["value"]) + self.used = int(properties["used"]["value"]) + self.userrefs = int(properties["userrefs"]["value"]) + self.version = int(properties["version"]["value"]) + self.written = int(properties["written"]["value"]) + + def __repr__(self) -> str: + """__repr__.""" + return f"name={self.name} used={self.used} refer={self.referenced}" + + +class Dataset: + """Dataset.""" + + def __init__(self, name: str) -> None: + """__init__.""" + dataset_data = _zfs_list(f"zfs list {name} -pHj -o all") + + properties = dataset_data["datasets"][name]["properties"] + + self.aclinherit = properties["aclinherit"]["value"] + self.aclmode = properties["aclmode"]["value"] + self.acltype = properties["acltype"]["value"] + self.available = int(properties["available"]["value"]) + self.canmount = properties["canmount"]["value"] + self.checksum = properties["checksum"]["value"] + self.clones = properties["clones"]["value"] + self.compression = properties["compression"]["value"] + self.copies = int(properties["copies"]["value"]) + self.createtxg = int(properties["createtxg"]["value"]) + self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC) + self.dedup = properties["dedup"]["value"] + self.devices = properties["devices"]["value"] + self.encryption = properties["encryption"]["value"] + self.exec = properties["exec"]["value"] + self.filesystem_limit = properties["filesystem_limit"]["value"] + self.guid = int(properties["guid"]["value"]) + self.keystatus = properties["keystatus"]["value"] + self.logbias = properties["logbias"]["value"] + self.mlslabel = properties["mlslabel"]["value"] + self.mounted = properties["mounted"]["value"] + self.mountpoint = properties["mountpoint"]["value"] + self.name = name + self.quota = int(properties["quota"]["value"]) + self.readonly = properties["readonly"]["value"] + self.recordsize = int(properties["recordsize"]["value"]) + self.redundant_metadata = properties["redundant_metadata"]["value"] + self.referenced = int(properties["referenced"]["value"]) + self.refquota = int(properties["refquota"]["value"]) + self.refreservation = int(properties["refreservation"]["value"]) + self.reservation = int(properties["reservation"]["value"]) + self.setuid = properties["setuid"]["value"] + self.sharenfs = properties["sharenfs"]["value"] + self.snapdir = properties["snapdir"]["value"] + self.snapshot_limit = properties["snapshot_limit"]["value"] + self.sync = properties["sync"]["value"] + self.used = int(properties["used"]["value"]) + self.usedbychildren = int(properties["usedbychildren"]["value"]) + self.usedbydataset = int(properties["usedbydataset"]["value"]) + self.usedbysnapshots = int(properties["usedbysnapshots"]["value"]) + self.version = int(properties["version"]["value"]) + self.volmode = properties["volmode"]["value"] + self.volsize = properties["volsize"]["value"] + self.vscan = properties["vscan"]["value"] + self.written = int(properties["written"]["value"]) + self.xattr = properties["xattr"]["value"] + + def get_snapshots(self) -> list[Snapshot] | None: + """Get all snapshots from zfs and process then is test dicts of sets.""" + snapshots_data = _zfs_list(f"zfs list -t snapshot -pHj {self.name} -o all") + + return [Snapshot(properties) for properties in snapshots_data["datasets"].values()] + + def create_snapshot(self, snapshot_name: str) -> str: + """Creates a zfs snapshot. + + Args: + snapshot_name (str): a snapshot name + """ + logging.debug(f"Creating {self.name}@{snapshot_name}") + _, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}") + if return_code == 0: + return "snapshot created" + + if snapshots := self.get_snapshots(): + snapshot_names = {snapshot.name for snapshot in snapshots} + if snapshot_name in snapshot_names: + return f"Snapshot {snapshot_name} already exists for {self.name}" + + return f"Failed to create snapshot {snapshot_name} for {self.name}" + + def delete_snapshot(self, snapshot_name: str) -> str | None: + """Deletes a zfs snapshot. + + Args: + snapshot_name (str): a snapshot name + """ + logging.debug(f"deleting {self.name}@{snapshot_name}") + msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}") + if return_code != 0: + if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"): + return "snapshot has dependent clones" + error = f"Failed to delete snapshot {snapshot_name=} for {self.name}" + raise RuntimeError(error) + return None + + def __repr__(self) -> str: + """__repr__.""" + return ( + f"{self.aclinherit=}\n" + f"{self.aclmode=}\n" + f"{self.acltype=}\n" + f"{self.available=}\n" + f"{self.canmount=}\n" + f"{self.checksum=}\n" + f"{self.clones=}\n" + f"{self.compression=}\n" + f"{self.copies=}\n" + f"{self.createtxg=}\n" + f"{self.creation=}\n" + f"{self.dedup=}\n" + f"{self.devices=}\n" + f"{self.encryption=}\n" + f"{self.exec=}\n" + f"{self.filesystem_limit=}\n" + f"{self.guid=}\n" + f"{self.keystatus=}\n" + f"{self.logbias=}\n" + f"{self.mlslabel=}\n" + f"{self.mounted=}\n" + f"{self.mountpoint=}\n" + f"{self.name=}\n" + f"{self.quota=}\n" + f"{self.readonly=}\n" + f"{self.recordsize=}\n" + f"{self.redundant_metadata=}\n" + f"{self.referenced=}\n" + f"{self.refquota=}\n" + f"{self.refreservation=}\n" + f"{self.reservation=}\n" + f"{self.setuid=}\n" + f"{self.sharenfs=}\n" + f"{self.snapdir=}\n" + f"{self.snapshot_limit=}\n" + f"{self.sync=}\n" + f"{self.used=}\n" + f"{self.usedbychildren=}\n" + f"{self.usedbydataset=}\n" + f"{self.usedbysnapshots=}\n" + f"{self.version=}\n" + f"{self.volmode=}\n" + f"{self.volsize=}\n" + f"{self.vscan=}\n" + f"{self.written=}\n" + f"{self.xattr=}\n" + ) + + +def get_datasets() -> list[Dataset]: + """Get zfs list. + + Returns: + list[Dataset]: A list of zfs datasets. + """ + logging.info("Getting zfs list") + + dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name") + + cleaned_datasets = dataset_names.strip().split("\n") + + return [Dataset(dataset_name) for dataset_name in cleaned_datasets if "/" in dataset_name] diff --git a/python/zfs/zpool.py b/python/zfs/zpool.py new file mode 100644 index 0000000..95a093b --- /dev/null +++ b/python/zfs/zpool.py @@ -0,0 +1,86 @@ +"""test.""" + +from __future__ import annotations + +import json +from typing import Any + +from python.common import bash_wrapper + + +def _zpool_list(zfs_list: str) -> dict[str, Any]: + """Check the version of zfs.""" + raw_zfs_list_data, _ = bash_wrapper(zfs_list) + + zfs_list_data = json.loads(raw_zfs_list_data) + + vers_major = zfs_list_data["output_version"]["vers_major"] + vers_minor = zfs_list_data["output_version"]["vers_minor"] + command = zfs_list_data["output_version"]["command"] + + if vers_major != 0 or vers_minor != 1 or command != "zpool list": + error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}" + raise RuntimeError(error) + + return zfs_list_data + + +class Zpool: + """Zpool.""" + + def __init__( + self, + name: str, + ) -> None: + """__init__.""" + zpool_data = _zpool_list(f"zpool list {name} -pHj -o all") + + properties = zpool_data["pools"][name]["properties"] + + self.name = name + + self.allocated = int(properties["allocated"]["value"]) + self.altroot = properties["altroot"]["value"] + self.ashift = int(properties["ashift"]["value"]) + self.autoexpand = properties["autoexpand"]["value"] + self.autoreplace = properties["autoreplace"]["value"] + self.autotrim = properties["autotrim"]["value"] + self.capacity = int(properties["capacity"]["value"]) + self.comment = properties["comment"]["value"] + self.dedupratio = properties["dedupratio"]["value"] + self.delegation = properties["delegation"]["value"] + self.expandsize = properties["expandsize"]["value"] + self.failmode = properties["failmode"]["value"] + self.fragmentation = int(properties["fragmentation"]["value"]) + self.free = properties["free"]["value"] + self.freeing = int(properties["freeing"]["value"]) + self.guid = int(properties["guid"]["value"]) + self.health = properties["health"]["value"] + self.leaked = int(properties["leaked"]["value"]) + self.readonly = properties["readonly"]["value"] + self.size = int(properties["size"]["value"]) + + def __repr__(self) -> str: + """__repr__.""" + return ( + f"{self.name=}\n" + f"{self.allocated=}\n" + f"{self.altroot=}\n" + f"{self.ashift=}\n" + f"{self.autoexpand=}\n" + f"{self.autoreplace=}\n" + f"{self.autotrim=}\n" + f"{self.capacity=}\n" + f"{self.comment=}\n" + f"{self.dedupratio=}\n" + f"{self.delegation=}\n" + f"{self.expandsize=}\n" + f"{self.failmode=}\n" + f"{self.fragmentation=}\n" + f"{self.freeing=}\n" + f"{self.guid=}\n" + f"{self.health=}\n" + f"{self.leaked=}\n" + f"{self.readonly=}\n" + f"{self.size=}" + )