mirror of
https://github.com/RichieCahill/dotfiles.git
synced 2026-04-17 04:58:19 -04:00
213 lines
8.3 KiB
Python
213 lines
8.3 KiB
Python
"""dataset."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
from typing import Any
|
|
|
|
from python.common import bash_wrapper
|
|
|
|
|
|
def _zfs_list(zfs_list: str) -> dict[str, Any]:
|
|
"""Check the version of zfs."""
|
|
raw_zfs_list_data, _ = bash_wrapper(zfs_list)
|
|
|
|
zfs_list_data = json.loads(raw_zfs_list_data)
|
|
|
|
vers_major = zfs_list_data["output_version"]["vers_major"]
|
|
vers_minor = zfs_list_data["output_version"]["vers_minor"]
|
|
command = zfs_list_data["output_version"]["command"]
|
|
|
|
if vers_major != 0 or vers_minor != 1 or command != "zfs list":
|
|
error = f"Datasets are not in the correct format {vers_major=} {vers_minor=} {command=}"
|
|
raise RuntimeError(error)
|
|
|
|
return zfs_list_data
|
|
|
|
|
|
class Snapshot:
|
|
"""Snapshot."""
|
|
|
|
def __init__(self, snapshot_data: dict[str, Any]) -> None:
|
|
"""__init__."""
|
|
properties = snapshot_data["properties"]
|
|
self.createtxg = int(snapshot_data["createtxg"])
|
|
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
|
|
self.defer_destroy = properties["defer_destroy"]["value"]
|
|
self.guid = int(properties["guid"]["value"])
|
|
self.name = snapshot_data["name"].split("@")[1]
|
|
self.objsetid = int(properties["objsetid"]["value"])
|
|
self.referenced = int(properties["referenced"]["value"])
|
|
self.used = int(properties["used"]["value"])
|
|
self.userrefs = int(properties["userrefs"]["value"])
|
|
self.version = int(properties["version"]["value"])
|
|
self.written = int(properties["written"]["value"])
|
|
|
|
def __repr__(self) -> str:
|
|
"""__repr__."""
|
|
return f"name={self.name} used={self.used} refer={self.referenced}"
|
|
|
|
|
|
class Dataset:
|
|
"""Dataset."""
|
|
|
|
def __init__(self, name: str) -> None:
|
|
"""__init__."""
|
|
dataset_data = _zfs_list(f"zfs list {name} -pHj -o all")
|
|
|
|
properties = dataset_data["datasets"][name]["properties"]
|
|
|
|
self.aclinherit = properties["aclinherit"]["value"]
|
|
self.aclmode = properties["aclmode"]["value"]
|
|
self.acltype = properties["acltype"]["value"]
|
|
self.available = int(properties["available"]["value"])
|
|
self.canmount = properties["canmount"]["value"]
|
|
self.checksum = properties["checksum"]["value"]
|
|
self.clones = properties["clones"]["value"]
|
|
self.compression = properties["compression"]["value"]
|
|
self.copies = int(properties["copies"]["value"])
|
|
self.createtxg = int(properties["createtxg"]["value"])
|
|
self.creation = datetime.fromtimestamp(int(properties["creation"]["value"]), tz=UTC)
|
|
self.dedup = properties["dedup"]["value"]
|
|
self.devices = properties["devices"]["value"]
|
|
self.encryption = properties["encryption"]["value"]
|
|
self.exec = properties["exec"]["value"]
|
|
self.filesystem_limit = properties["filesystem_limit"]["value"]
|
|
self.guid = int(properties["guid"]["value"])
|
|
self.keystatus = properties["keystatus"]["value"]
|
|
self.logbias = properties["logbias"]["value"]
|
|
self.mlslabel = properties["mlslabel"]["value"]
|
|
self.mounted = properties["mounted"]["value"]
|
|
self.mountpoint = properties["mountpoint"]["value"]
|
|
self.name = name
|
|
self.quota = int(properties["quota"]["value"])
|
|
self.readonly = properties["readonly"]["value"]
|
|
self.recordsize = int(properties["recordsize"]["value"])
|
|
self.redundant_metadata = properties["redundant_metadata"]["value"]
|
|
self.referenced = int(properties["referenced"]["value"])
|
|
self.refquota = int(properties["refquota"]["value"])
|
|
self.refreservation = int(properties["refreservation"]["value"])
|
|
self.reservation = int(properties["reservation"]["value"])
|
|
self.setuid = properties["setuid"]["value"]
|
|
self.sharenfs = properties["sharenfs"]["value"]
|
|
self.snapdir = properties["snapdir"]["value"]
|
|
self.snapshot_limit = properties["snapshot_limit"]["value"]
|
|
self.sync = properties["sync"]["value"]
|
|
self.used = int(properties["used"]["value"])
|
|
self.usedbychildren = int(properties["usedbychildren"]["value"])
|
|
self.usedbydataset = int(properties["usedbydataset"]["value"])
|
|
self.usedbysnapshots = int(properties["usedbysnapshots"]["value"])
|
|
self.version = int(properties["version"]["value"])
|
|
self.volmode = properties["volmode"]["value"]
|
|
self.volsize = properties["volsize"]["value"]
|
|
self.vscan = properties["vscan"]["value"]
|
|
self.written = int(properties["written"]["value"])
|
|
self.xattr = properties["xattr"]["value"]
|
|
|
|
def get_snapshots(self) -> list[Snapshot] | None:
|
|
"""Get all snapshots from zfs and process then is test dicts of sets."""
|
|
snapshots_data = _zfs_list(f"zfs list -t snapshot -pHj {self.name} -o all")
|
|
|
|
return [Snapshot(properties) for properties in snapshots_data["datasets"].values()]
|
|
|
|
def create_snapshot(self, snapshot_name: str) -> str:
|
|
"""Creates a zfs snapshot.
|
|
|
|
Args:
|
|
snapshot_name (str): a snapshot name
|
|
"""
|
|
logging.debug(f"Creating {self.name}@{snapshot_name}")
|
|
_, return_code = bash_wrapper(f"zfs snapshot {self.name}@{snapshot_name}")
|
|
if return_code == 0:
|
|
return "snapshot created"
|
|
|
|
if snapshots := self.get_snapshots():
|
|
snapshot_names = {snapshot.name for snapshot in snapshots}
|
|
if snapshot_name in snapshot_names:
|
|
return f"Snapshot {snapshot_name} already exists for {self.name}"
|
|
|
|
return f"Failed to create snapshot {snapshot_name} for {self.name}"
|
|
|
|
def delete_snapshot(self, snapshot_name: str) -> str | None:
|
|
"""Deletes a zfs snapshot.
|
|
|
|
Args:
|
|
snapshot_name (str): a snapshot name
|
|
"""
|
|
logging.debug(f"deleting {self.name}@{snapshot_name}")
|
|
msg, return_code = bash_wrapper(f"zfs destroy {self.name}@{snapshot_name}")
|
|
if return_code != 0:
|
|
if msg.startswith(f"cannot destroy '{self.name}@{snapshot_name}': snapshot has dependent clones"):
|
|
return "snapshot has dependent clones"
|
|
error = f"Failed to delete snapshot {snapshot_name=} for {self.name}"
|
|
raise RuntimeError(error)
|
|
return None
|
|
|
|
def __repr__(self) -> str:
|
|
"""__repr__."""
|
|
return (
|
|
f"{self.aclinherit=}\n"
|
|
f"{self.aclmode=}\n"
|
|
f"{self.acltype=}\n"
|
|
f"{self.available=}\n"
|
|
f"{self.canmount=}\n"
|
|
f"{self.checksum=}\n"
|
|
f"{self.clones=}\n"
|
|
f"{self.compression=}\n"
|
|
f"{self.copies=}\n"
|
|
f"{self.createtxg=}\n"
|
|
f"{self.creation=}\n"
|
|
f"{self.dedup=}\n"
|
|
f"{self.devices=}\n"
|
|
f"{self.encryption=}\n"
|
|
f"{self.exec=}\n"
|
|
f"{self.filesystem_limit=}\n"
|
|
f"{self.guid=}\n"
|
|
f"{self.keystatus=}\n"
|
|
f"{self.logbias=}\n"
|
|
f"{self.mlslabel=}\n"
|
|
f"{self.mounted=}\n"
|
|
f"{self.mountpoint=}\n"
|
|
f"{self.name=}\n"
|
|
f"{self.quota=}\n"
|
|
f"{self.readonly=}\n"
|
|
f"{self.recordsize=}\n"
|
|
f"{self.redundant_metadata=}\n"
|
|
f"{self.referenced=}\n"
|
|
f"{self.refquota=}\n"
|
|
f"{self.refreservation=}\n"
|
|
f"{self.reservation=}\n"
|
|
f"{self.setuid=}\n"
|
|
f"{self.sharenfs=}\n"
|
|
f"{self.snapdir=}\n"
|
|
f"{self.snapshot_limit=}\n"
|
|
f"{self.sync=}\n"
|
|
f"{self.used=}\n"
|
|
f"{self.usedbychildren=}\n"
|
|
f"{self.usedbydataset=}\n"
|
|
f"{self.usedbysnapshots=}\n"
|
|
f"{self.version=}\n"
|
|
f"{self.volmode=}\n"
|
|
f"{self.volsize=}\n"
|
|
f"{self.vscan=}\n"
|
|
f"{self.written=}\n"
|
|
f"{self.xattr=}\n"
|
|
)
|
|
|
|
|
|
def get_datasets() -> list[Dataset]:
|
|
"""Get zfs list.
|
|
|
|
Returns:
|
|
list[Dataset]: A list of zfs datasets.
|
|
"""
|
|
logging.info("Getting zfs list")
|
|
|
|
dataset_names, _ = bash_wrapper("zfs list -Hp -t filesystem -o name")
|
|
|
|
cleaned_datasets = dataset_names.strip().split("\n")
|
|
|
|
return [Dataset(dataset_name) for dataset_name in cleaned_datasets if "/" in dataset_name]
|