Compare commits
21 Commits
6dba9c2780
...
v0.4.0
| Author | SHA1 | Date | |
|---|---|---|---|
| e8e4e3636c | |||
| 2660d7d2f5 | |||
| 9ef27716c6 | |||
| 3bac1243a1 | |||
| d196662ac6 | |||
| 44d917c264 | |||
| 46e980fe76 | |||
| 8b7d76017f | |||
| a6e3471181 | |||
| 87cc05f33a | |||
| 2784bb678a | |||
| 1baeb9f465 | |||
| 5618cb5efc | |||
| 302009cf59 | |||
| 1face8eea8 | |||
| 48078e5bd3 | |||
| 21e674ffb0 | |||
| 694ae9cc71 | |||
| 8fb4578554 | |||
| 4b1cfadc4d | |||
| b42218a883 |
25
.gitea/workflows/goreleaser.yaml
Normal file
25
.gitea/workflows/goreleaser.yaml
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
name: Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- "*"
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Check out
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
- name: Set up mise
|
||||||
|
uses: jdx/mise-action@v4
|
||||||
|
- name: Run GoReleaser
|
||||||
|
uses: goreleaser/goreleaser-action@v6
|
||||||
|
with:
|
||||||
|
distribution: goreleaser
|
||||||
|
version: "~> v2"
|
||||||
|
args: "release --clean"
|
||||||
|
env:
|
||||||
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
28
.github/workflows/goreleaser.yaml
vendored
Normal file
28
.github/workflows/goreleaser.yaml
vendored
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
name: Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- "*"
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
contents: write
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Check out
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
- name: Set up mise
|
||||||
|
uses: jdx/mise-action@v4
|
||||||
|
- name: Run GoReleaser
|
||||||
|
uses: goreleaser/goreleaser-action@v6
|
||||||
|
with:
|
||||||
|
distribution: goreleaser
|
||||||
|
version: "~> v2"
|
||||||
|
args: "release --clean"
|
||||||
|
env:
|
||||||
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1 +1,2 @@
|
|||||||
target/
|
target/
|
||||||
|
dist/
|
||||||
|
|||||||
15
.goreleaser.yaml
Normal file
15
.goreleaser.yaml
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
version: 2
|
||||||
|
before:
|
||||||
|
hooks:
|
||||||
|
- cargo install --locked cargo-zigbuild
|
||||||
|
builds:
|
||||||
|
- builder: rust
|
||||||
|
targets:
|
||||||
|
- x86_64-unknown-linux-gnu
|
||||||
|
- aarch64-unknown-linux-gnu
|
||||||
|
archives:
|
||||||
|
- formats: ["binary"]
|
||||||
|
|
||||||
|
gitea_urls:
|
||||||
|
api: https://git.shee.sh/api/v1
|
||||||
|
download: https://git.shee.sh
|
||||||
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -154,12 +154,24 @@ version = "0.8.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "equivalent"
|
||||||
|
version = "1.0.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "find-msvc-tools"
|
name = "find-msvc-tools"
|
||||||
version = "0.1.9"
|
version = "0.1.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
|
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashbrown"
|
||||||
|
version = "0.17.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heck"
|
name = "heck"
|
||||||
version = "0.5.0"
|
version = "0.5.0"
|
||||||
@@ -190,6 +202,16 @@ dependencies = [
|
|||||||
"cc",
|
"cc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "indexmap"
|
||||||
|
version = "2.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9"
|
||||||
|
dependencies = [
|
||||||
|
"equivalent",
|
||||||
|
"hashbrown",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "is_terminal_polyfill"
|
name = "is_terminal_polyfill"
|
||||||
version = "1.70.2"
|
version = "1.70.2"
|
||||||
@@ -413,8 +435,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "zfsbackup"
|
name = "zfsbackup"
|
||||||
version = "0.1.1"
|
version = "0.4.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
|
"indexmap",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "zfsbackup"
|
name = "zfsbackup"
|
||||||
version = "0.1.1"
|
version = "0.4.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
chrono = "0.4.44"
|
chrono = "0.4.44"
|
||||||
clap = { version = "4.6.1", features = ["derive"] }
|
clap = { version = "4.6.1", features = ["derive"] }
|
||||||
|
indexmap = "2.14.0"
|
||||||
|
|||||||
6
mise.toml
Normal file
6
mise.toml
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
[tools]
|
||||||
|
rust = "stable"
|
||||||
|
zig = "latest"
|
||||||
|
|
||||||
|
[tasks]
|
||||||
|
build = "cargo build"
|
||||||
@@ -6,7 +6,12 @@ pub trait Filter {
|
|||||||
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
// TODO: Make this support quoting for args with spaces (and escape nested quotes)
|
||||||
|
fn display_command(command: &Vec<&str>) -> String {
|
||||||
|
command.join(" ")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn exec(command: &Vec<&str>) -> Result<String, String> {
|
||||||
if command.is_empty() {
|
if command.is_empty() {
|
||||||
return Err("Command is empty".to_string());
|
return Err("Command is empty".to_string());
|
||||||
}
|
}
|
||||||
@@ -14,11 +19,13 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
|||||||
if command.len() > 1 {
|
if command.len() > 1 {
|
||||||
cmd.args(&command[1..]);
|
cmd.args(&command[1..]);
|
||||||
}
|
}
|
||||||
let output = cmd.output().map_err(|e| e.to_string())?;
|
let output = cmd
|
||||||
|
.output()
|
||||||
|
.map_err(|e| format!("Failed to run command {}: {}", command[0], e))?;
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
"Command {:?} failed with status {}: {}",
|
"Command '{}' failed with status {}: {}",
|
||||||
command,
|
display_command(command),
|
||||||
output.status,
|
output.status,
|
||||||
String::from_utf8_lossy(&output.stderr)
|
String::from_utf8_lossy(&output.stderr)
|
||||||
));
|
));
|
||||||
@@ -27,7 +34,10 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
|||||||
Ok(output_str)
|
Ok(output_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exec_piped_commands(
|
/// Executes a pipeline of commands, where the output of the source command is passed through a
|
||||||
|
/// series of filters before being piped into the destination command. The filters are applied in
|
||||||
|
/// the order they are provided.
|
||||||
|
pub fn exec_pipe(
|
||||||
source: &Vec<&str>,
|
source: &Vec<&str>,
|
||||||
dest: &Vec<&str>,
|
dest: &Vec<&str>,
|
||||||
filters: Vec<Box<dyn Filter>>,
|
filters: Vec<Box<dyn Filter>>,
|
||||||
|
|||||||
622
src/job.rs
622
src/job.rs
@@ -2,34 +2,331 @@ use crate::command;
|
|||||||
use crate::progress::BackupEvent;
|
use crate::progress::BackupEvent;
|
||||||
use crate::progress::filter;
|
use crate::progress::filter;
|
||||||
use chrono::{Local, NaiveDateTime};
|
use chrono::{Local, NaiveDateTime};
|
||||||
|
use indexmap::set::IndexSet;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
use std::fmt;
|
||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
pub struct Job {
|
||||||
|
datasets: IndexSet<String>,
|
||||||
|
target: String,
|
||||||
|
source_zfs_command: Vec<String>,
|
||||||
|
target_zfs_command: Vec<String>,
|
||||||
|
retain: usize,
|
||||||
|
sender: Option<Sender<BackupEvent>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone)]
|
||||||
|
enum JobSide {
|
||||||
|
Source,
|
||||||
|
Target,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Snapshot {
|
||||||
|
dataset: String,
|
||||||
|
snapshot_id: String,
|
||||||
|
side: JobSide,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Snapshot {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}@{}", self.dataset, self.snapshot_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job {
|
||||||
|
fn send_event(&self, event: BackupEvent) {
|
||||||
|
if let Some(sender) = &self.sender {
|
||||||
|
sender.send(event).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_side_command(&self, side: JobSide) -> Vec<&str> {
|
||||||
|
match side {
|
||||||
|
JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||||
|
JobSide::Target => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<Snapshot, String> {
|
||||||
|
let snapshot = Snapshot {
|
||||||
|
dataset: dataset.to_string(),
|
||||||
|
snapshot_id: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
|
||||||
|
side,
|
||||||
|
};
|
||||||
|
let name = snapshot.to_string();
|
||||||
|
let mut cmd = self.get_side_command(side);
|
||||||
|
cmd.extend(["snapshot", &name]);
|
||||||
|
command::exec(&cmd)?;
|
||||||
|
self.send_event(BackupEvent::SnapshotCreated(name.clone()));
|
||||||
|
Ok(snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn estimate(&self, source: &str, inc_snapshot: Option<&str>) -> Result<u64, String> {
|
||||||
|
let mut cmd = self.get_side_command(JobSide::Source);
|
||||||
|
cmd.push("send");
|
||||||
|
if let Some(inc) = inc_snapshot {
|
||||||
|
cmd.extend(["-i", inc]);
|
||||||
|
}
|
||||||
|
cmd.extend(["-n", "-P"]);
|
||||||
|
cmd.push(source);
|
||||||
|
let output = command::exec(&cmd)?;
|
||||||
|
let size = output
|
||||||
|
.lines()
|
||||||
|
.last()
|
||||||
|
.ok_or("estimate parse error")?
|
||||||
|
.split_whitespace()
|
||||||
|
.last()
|
||||||
|
.ok_or("estimate parse error")?
|
||||||
|
.parse::<u64>()
|
||||||
|
.map_err(|e| format!("estimate parse error: {}", e))?;
|
||||||
|
self.send_event(BackupEvent::Estimate(size));
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_receive(
|
||||||
|
&self,
|
||||||
|
source: &str,
|
||||||
|
dest: &str,
|
||||||
|
inc_snapshot: Option<&str>,
|
||||||
|
total: Option<u64>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let mut send_cmd = self.get_side_command(JobSide::Source);
|
||||||
|
send_cmd.push("send");
|
||||||
|
if let Some(inc) = inc_snapshot {
|
||||||
|
send_cmd.extend(["-i", inc]);
|
||||||
|
}
|
||||||
|
send_cmd.push(source);
|
||||||
|
|
||||||
|
let mut receive_cmd = self.get_side_command(JobSide::Target);
|
||||||
|
receive_cmd.extend(["receive", "-F", dest]);
|
||||||
|
let filters: Vec<Box<dyn command::Filter>> = match self.sender.as_ref() {
|
||||||
|
Some(sender) => vec![Box::new(filter::ByteCountFilter::new(
|
||||||
|
sender.clone(),
|
||||||
|
total,
|
||||||
|
))],
|
||||||
|
None => vec![],
|
||||||
|
};
|
||||||
|
command::exec_pipe(&send_cmd, &receive_cmd, filters)?;
|
||||||
|
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||||
|
let mut cmd = self.get_side_command(side);
|
||||||
|
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||||
|
let output = command::exec(&cmd)?;
|
||||||
|
let snapshots: Vec<&str> = output
|
||||||
|
.split_whitespace()
|
||||||
|
.map(|s| {
|
||||||
|
let parts: Vec<&str> = s.split("@").collect();
|
||||||
|
if parts.len() == 2 {
|
||||||
|
Ok(parts[1])
|
||||||
|
} else {
|
||||||
|
Err(format!("invalid snapshot name: {}", s))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
let result: Vec<String> = snapshots
|
||||||
|
.iter()
|
||||||
|
.filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok())
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.collect();
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_matching_snapshot_id(&self, source: &str, dest: &str) -> Result<String, String> {
|
||||||
|
self.find_matching_snapshot_ids(source, dest)?
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.ok_or(String::from("no matching snapshots"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_matching_snapshot_ids(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
||||||
|
let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?;
|
||||||
|
let mut dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?;
|
||||||
|
dest_snapshots.sort_by(|a, b| b.cmp(a));
|
||||||
|
|
||||||
|
let source_set: HashSet<String> = source_snapshots.into_iter().collect();
|
||||||
|
|
||||||
|
let matching_snapshots: Vec<String> = dest_snapshots
|
||||||
|
.into_iter()
|
||||||
|
.filter(|s| source_set.contains(s))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if matching_snapshots.is_empty() {
|
||||||
|
Err(String::from("no matching snapshots"))
|
||||||
|
} else {
|
||||||
|
Ok(matching_snapshots)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_old_snapshots(&self, source: &str, dest: &str) -> Result<Vec<Snapshot>, String> {
|
||||||
|
let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?;
|
||||||
|
let dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?;
|
||||||
|
let matching_snapshots = self.find_matching_snapshot_ids(source, dest)?;
|
||||||
|
if matching_snapshots.is_empty() {
|
||||||
|
return Err(String::from("no matching snapshots found"));
|
||||||
|
}
|
||||||
|
let retain: HashSet<String> = matching_snapshots.into_iter().take(self.retain).collect();
|
||||||
|
let result = source_snapshots
|
||||||
|
.into_iter()
|
||||||
|
.filter(|s| !retain.contains(s))
|
||||||
|
.map(|s| Snapshot {
|
||||||
|
dataset: source.to_string(),
|
||||||
|
snapshot_id: s,
|
||||||
|
side: JobSide::Source,
|
||||||
|
})
|
||||||
|
.chain(
|
||||||
|
dest_snapshots
|
||||||
|
.into_iter()
|
||||||
|
.filter(|s| !retain.contains(s))
|
||||||
|
.map(|s| Snapshot {
|
||||||
|
dataset: dest.to_string(),
|
||||||
|
snapshot_id: s,
|
||||||
|
side: JobSide::Target,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.collect();
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<(), String> {
|
||||||
|
let mut cmd = self.get_side_command(snapshot.side);
|
||||||
|
let name = snapshot.to_string();
|
||||||
|
cmd.extend(["destroy", &name]);
|
||||||
|
command::exec(&cmd)?;
|
||||||
|
self.send_event(BackupEvent::SnapshotDeleted(name.clone()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(&self, execute: bool) -> Result<(), String> {
|
||||||
|
for (index, source) in self.datasets.iter().enumerate() {
|
||||||
|
// Check the source exists
|
||||||
|
let mut cmd = self.get_side_command(JobSide::Source);
|
||||||
|
cmd.extend(["list", "-H", "-o", "name", source]);
|
||||||
|
let _ = command::exec(&cmd)?;
|
||||||
|
|
||||||
|
// Check whether the destination exists
|
||||||
|
// TODO: This will assume the destination doesn't exist if the
|
||||||
|
// command fails for any reason.
|
||||||
|
let dest = format!("{}/{}", self.target, source);
|
||||||
|
cmd = self.get_side_command(JobSide::Target);
|
||||||
|
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||||
|
let dest_exists = command::exec(&cmd).is_ok();
|
||||||
|
|
||||||
|
// Run backup
|
||||||
|
if dest_exists {
|
||||||
|
self.send_event(BackupEvent::StartingIncrementalBackup {
|
||||||
|
source: source.clone(),
|
||||||
|
dest: dest.clone(),
|
||||||
|
index: index + 1,
|
||||||
|
total: self.datasets.len(),
|
||||||
|
});
|
||||||
|
let inc_snapshot = format!(
|
||||||
|
"{}@{}",
|
||||||
|
source,
|
||||||
|
self.find_latest_matching_snapshot_id(source, &dest)?
|
||||||
|
);
|
||||||
|
|
||||||
|
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||||
|
let total = self
|
||||||
|
.estimate(&snapshot.to_string(), Some(&inc_snapshot))
|
||||||
|
.ok();
|
||||||
|
if !execute {
|
||||||
|
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
||||||
|
self.delete_snapshot(snapshot)?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.send_receive(&snapshot.to_string(), &dest, Some(&inc_snapshot), total)?;
|
||||||
|
} else {
|
||||||
|
self.send_event(BackupEvent::StartingFullBackup {
|
||||||
|
source: source.clone(),
|
||||||
|
dest: dest.clone(),
|
||||||
|
index: index + 1,
|
||||||
|
total: self.datasets.len(),
|
||||||
|
});
|
||||||
|
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||||
|
let total = self.estimate(&snapshot.to_string(), None).ok();
|
||||||
|
if !execute {
|
||||||
|
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
||||||
|
self.delete_snapshot(snapshot)?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.send_receive(&snapshot.to_string(), &dest, None, total)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up snapshots
|
||||||
|
self.find_old_snapshots(source, &dest)?
|
||||||
|
.into_iter()
|
||||||
|
.map(|s| self.delete_snapshot(s))
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct JobBuilder {
|
pub struct JobBuilder {
|
||||||
sources: Vec<String>,
|
sources: Vec<String>,
|
||||||
target: String,
|
target: String,
|
||||||
source_zfs_command: Vec<String>,
|
source_zfs_command: Vec<String>,
|
||||||
target_zfs_command: Vec<String>,
|
target_zfs_command: Vec<String>,
|
||||||
dryrun: bool,
|
|
||||||
retain: usize,
|
retain: usize,
|
||||||
sender: Option<Sender<BackupEvent>>,
|
sender: Option<Sender<BackupEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse a command string into a vector of strings, splitting on whitespace.
|
||||||
|
/// Quoting is supported using ' or ".
|
||||||
fn parse_command(commandstr: &str) -> Vec<String> {
|
fn parse_command(commandstr: &str) -> Vec<String> {
|
||||||
commandstr
|
let mut arg = String::new();
|
||||||
.split_whitespace()
|
let mut command: Vec<String> = Vec::new();
|
||||||
.map(|s| s.to_string())
|
let mut in_single_quote = false;
|
||||||
.collect()
|
let mut in_double_quote = false;
|
||||||
|
|
||||||
|
for c in commandstr.chars() {
|
||||||
|
match c {
|
||||||
|
'\'' if !in_double_quote => {
|
||||||
|
in_single_quote = !in_single_quote;
|
||||||
|
}
|
||||||
|
'\'' => {
|
||||||
|
arg.push(c);
|
||||||
|
}
|
||||||
|
'"' if !in_single_quote => {
|
||||||
|
in_double_quote = !in_double_quote;
|
||||||
|
}
|
||||||
|
'"' => {
|
||||||
|
arg.push(c);
|
||||||
|
}
|
||||||
|
' ' if in_single_quote || in_double_quote => {
|
||||||
|
arg.push(c);
|
||||||
|
}
|
||||||
|
' ' => {
|
||||||
|
if !arg.is_empty() {
|
||||||
|
command.push(arg.clone());
|
||||||
|
arg.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
arg.push(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !arg.is_empty() {
|
||||||
|
command.push(arg);
|
||||||
|
}
|
||||||
|
command
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobBuilder {
|
impl JobBuilder {
|
||||||
|
/// Create a new JobBuilder with the given sources and target. The sources should be ZFS
|
||||||
|
/// datasets, optionally ending with "/..." to include all child datasets. The target should be
|
||||||
|
/// a ZFS dataset.
|
||||||
pub fn new(sources: Vec<String>, target: String) -> Self {
|
pub fn new(sources: Vec<String>, target: String) -> Self {
|
||||||
JobBuilder {
|
JobBuilder {
|
||||||
sources,
|
sources,
|
||||||
target,
|
target,
|
||||||
source_zfs_command: vec!["zfs".to_string()],
|
source_zfs_command: vec!["zfs".to_string()],
|
||||||
target_zfs_command: vec!["zfs".to_string()],
|
target_zfs_command: vec!["zfs".to_string()],
|
||||||
dryrun: false,
|
|
||||||
retain: 2,
|
retain: 2,
|
||||||
sender: None,
|
sender: None,
|
||||||
}
|
}
|
||||||
@@ -54,11 +351,6 @@ impl JobBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dryrun(mut self) -> Self {
|
|
||||||
self.dryrun = true;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn retain(mut self, retain: usize) -> Self {
|
pub fn retain(mut self, retain: usize) -> Self {
|
||||||
self.retain = retain;
|
self.retain = retain;
|
||||||
self
|
self
|
||||||
@@ -70,16 +362,7 @@ impl JobBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn build(self) -> Result<Job, String> {
|
pub fn build(self) -> Result<Job, String> {
|
||||||
let mut job = Job {
|
let mut datasets: IndexSet<String> = IndexSet::new();
|
||||||
datasets: vec![],
|
|
||||||
target: self.target,
|
|
||||||
source_zfs_command: self.source_zfs_command,
|
|
||||||
target_zfs_command: self.target_zfs_command,
|
|
||||||
dryrun: self.dryrun,
|
|
||||||
retain: self.retain,
|
|
||||||
sender: self.sender,
|
|
||||||
};
|
|
||||||
let mut datasets: Vec<String> = vec![];
|
|
||||||
for source in &self.sources {
|
for source in &self.sources {
|
||||||
let recurse = source.ends_with("/...");
|
let recurse = source.ends_with("/...");
|
||||||
let source = source.trim_end_matches("/...");
|
let source = source.trim_end_matches("/...");
|
||||||
@@ -90,271 +373,68 @@ impl JobBuilder {
|
|||||||
}
|
}
|
||||||
args.push(source);
|
args.push(source);
|
||||||
|
|
||||||
let mut cmd = job.get_side_command(JobSide::Source);
|
let mut cmd: Vec<&str> = self.source_zfs_command.iter().map(|s| s.as_str()).collect();
|
||||||
cmd.extend(args);
|
cmd.extend(args);
|
||||||
let output = command::exec_command(&cmd)?;
|
let output = command::exec(&cmd)?;
|
||||||
datasets.extend(output.lines().map(str::to_string));
|
datasets.extend(output.lines().map(str::to_string));
|
||||||
}
|
}
|
||||||
if datasets.is_empty() {
|
if datasets.is_empty() {
|
||||||
return Err(String::from("no matching source datasets found"));
|
return Err(String::from("no matching source datasets found"));
|
||||||
}
|
}
|
||||||
job.datasets = datasets;
|
Ok(Job {
|
||||||
Ok(job)
|
datasets,
|
||||||
}
|
target: self.target,
|
||||||
}
|
source_zfs_command: self.source_zfs_command,
|
||||||
|
target_zfs_command: self.target_zfs_command,
|
||||||
pub struct Job {
|
retain: self.retain,
|
||||||
datasets: Vec<String>,
|
sender: self.sender,
|
||||||
target: String,
|
|
||||||
source_zfs_command: Vec<String>,
|
|
||||||
target_zfs_command: Vec<String>,
|
|
||||||
dryrun: bool,
|
|
||||||
retain: usize,
|
|
||||||
sender: Option<Sender<BackupEvent>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum JobSide {
|
|
||||||
Source,
|
|
||||||
Destination,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Snapshot {
|
|
||||||
snapshot: String,
|
|
||||||
side: JobSide,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Job {
|
|
||||||
pub fn dump(&self) {
|
|
||||||
println!("Datasets: {:?}", self.datasets);
|
|
||||||
println!("Target: {}", self.target);
|
|
||||||
println!("Source ZFS Command: {:?}", self.source_zfs_command);
|
|
||||||
println!("Target ZFS Command: {:?}", self.target_zfs_command);
|
|
||||||
println!("Dryrun: {}", self.dryrun);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_event(&self, event: BackupEvent) {
|
|
||||||
if let Some(sender) = &self.sender {
|
|
||||||
sender.send(event).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_side_command(&self, side: JobSide) -> Vec<&str> {
|
|
||||||
match side {
|
|
||||||
JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(),
|
|
||||||
JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<String, String> {
|
|
||||||
let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S"));
|
|
||||||
let mut cmd = self.get_side_command(side);
|
|
||||||
cmd.extend(["snapshot", &snapshot]);
|
|
||||||
command::exec_command(&cmd)?;
|
|
||||||
self.send_event(BackupEvent::SnapshotCreated(snapshot.clone()));
|
|
||||||
Ok(snapshot)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn estimate(&self, source: &str, inc_snapshot: Option<&str>) -> Result<u64, String> {
|
|
||||||
let mut cmd = self.get_side_command(JobSide::Source);
|
|
||||||
cmd.push("send");
|
|
||||||
if let Some(inc) = inc_snapshot {
|
|
||||||
cmd.extend(["-i", inc]);
|
|
||||||
}
|
|
||||||
cmd.extend(["-n", "-P"]);
|
|
||||||
cmd.push(source);
|
|
||||||
let output = command::exec_command(&cmd)?;
|
|
||||||
let size = output
|
|
||||||
.lines()
|
|
||||||
.last()
|
|
||||||
.ok_or("estimate parse error")?
|
|
||||||
.split_whitespace()
|
|
||||||
.last()
|
|
||||||
.ok_or("estimate parse error")?
|
|
||||||
.parse::<u64>()
|
|
||||||
.map_err(|e| format!("estimate parse error: {}", e))?;
|
|
||||||
self.send_event(BackupEvent::Estimate(size));
|
|
||||||
Ok(size)
|
|
||||||
}
|
|
||||||
fn send_receive(
|
|
||||||
&self,
|
|
||||||
source: &str,
|
|
||||||
dest: &str,
|
|
||||||
inc_snapshot: Option<&str>,
|
|
||||||
total: Option<u64>,
|
|
||||||
) -> Result<(), String> {
|
|
||||||
let mut send_cmd = self.get_side_command(JobSide::Source);
|
|
||||||
send_cmd.push("send");
|
|
||||||
if let Some(inc) = inc_snapshot {
|
|
||||||
send_cmd.extend(["-i", inc]);
|
|
||||||
}
|
|
||||||
send_cmd.push(source);
|
|
||||||
|
|
||||||
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
|
||||||
receive_cmd.extend(["receive", "-F", dest]);
|
|
||||||
let filters = match self.sender.as_ref() {
|
|
||||||
Some(sender) => vec![filter::CountingReaderBuilder::build(sender.clone(), total)],
|
|
||||||
None => vec![],
|
|
||||||
};
|
|
||||||
command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?;
|
|
||||||
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn list_snapshots(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
|
||||||
let mut cmd = self.get_side_command(side);
|
|
||||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
|
||||||
let output = command::exec_command(&cmd)?;
|
|
||||||
let snapshots: Vec<&str> = output
|
|
||||||
.split_whitespace()
|
|
||||||
.map(|s| {
|
|
||||||
let parts: Vec<&str> = s.split("@").collect();
|
|
||||||
if parts.len() == 2 {
|
|
||||||
Ok(parts[1])
|
|
||||||
} else {
|
|
||||||
Err(format!("invalid snapshot name: {}", s))
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
|
||||||
let result: Vec<String> = snapshots
|
|
||||||
.iter()
|
|
||||||
.filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok())
|
|
||||||
.map(|s| s.to_string())
|
|
||||||
.collect();
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result<String, String> {
|
|
||||||
self.find_matching_snapshots(source, dest)?
|
|
||||||
.into_iter()
|
|
||||||
.next()
|
|
||||||
.ok_or(String::from("no matching snapshots"))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
|
||||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
|
||||||
let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
|
||||||
dest_snapshots.sort_by(|a, b| b.cmp(a));
|
|
||||||
|
|
||||||
let source_set: HashSet<String> = source_snapshots.into_iter().collect();
|
|
||||||
|
|
||||||
let matching_snapshots: Vec<String> = dest_snapshots
|
|
||||||
.into_iter()
|
|
||||||
.filter(|s| source_set.contains(s))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if matching_snapshots.is_empty() {
|
|
||||||
Err(String::from("no matching snapshots"))
|
|
||||||
} else {
|
|
||||||
Ok(matching_snapshots)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_old_snapshots(&self, source: &str, dest: &str) -> Result<Vec<Snapshot>, String> {
|
#[cfg(test)]
|
||||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
mod tests {
|
||||||
let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
use super::*;
|
||||||
let matching_snapshots = self.find_matching_snapshots(source, dest)?;
|
|
||||||
if matching_snapshots.is_empty() {
|
#[test]
|
||||||
return Err(String::from("no matching snapshots found"));
|
fn test_parse_command_simple() {
|
||||||
}
|
assert_eq!(parse_command("zfs list"), vec!["zfs", "list"]);
|
||||||
let retain: HashSet<String> = matching_snapshots.into_iter().take(self.retain).collect();
|
|
||||||
let result = source_snapshots
|
|
||||||
.into_iter()
|
|
||||||
.filter(|s| !retain.contains(s))
|
|
||||||
.map(|s| Snapshot {
|
|
||||||
snapshot: format!("{}@{}", source, s),
|
|
||||||
side: JobSide::Source,
|
|
||||||
})
|
|
||||||
.chain(
|
|
||||||
dest_snapshots
|
|
||||||
.into_iter()
|
|
||||||
.filter(|s| !retain.contains(s))
|
|
||||||
.map(|s| Snapshot {
|
|
||||||
snapshot: format!("{}@{}", dest, s),
|
|
||||||
side: JobSide::Destination,
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
.collect();
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<String, String> {
|
#[test]
|
||||||
let mut cmd = self.get_side_command(snapshot.side);
|
fn test_parse_command_quoted() {
|
||||||
cmd.extend(["destroy", &snapshot.snapshot]);
|
assert_eq!(
|
||||||
let res = command::exec_command(&cmd);
|
parse_command("ssh host sudo zfs"),
|
||||||
if res.is_ok() {
|
vec!["ssh", "host", "sudo", "zfs"]
|
||||||
self.send_event(BackupEvent::SnapshotDeleted(snapshot.snapshot.clone()));
|
|
||||||
}
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&self) -> Result<(), String> {
|
|
||||||
for (index, source) in self.datasets.iter().enumerate() {
|
|
||||||
// Check the source exists
|
|
||||||
let mut cmd = self.get_side_command(JobSide::Source);
|
|
||||||
cmd.extend(["list", "-H", "-o", "name", source]);
|
|
||||||
let _ = command::exec_command(&cmd)?;
|
|
||||||
|
|
||||||
// Check whether the destination exists
|
|
||||||
// TODO: This will assume the destination doesn't exist if the
|
|
||||||
// command fails for any reason.
|
|
||||||
let dest = format!("{}/{}", self.target, source);
|
|
||||||
cmd = self.get_side_command(JobSide::Destination);
|
|
||||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
|
||||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
|
||||||
|
|
||||||
// Run backup
|
|
||||||
if dest_exists {
|
|
||||||
self.send_event(BackupEvent::StartingIncrementalBackup {
|
|
||||||
source: source.clone(),
|
|
||||||
dest: dest.clone(),
|
|
||||||
index: index + 1,
|
|
||||||
total: self.datasets.len(),
|
|
||||||
});
|
|
||||||
let inc_snapshot = format!(
|
|
||||||
"{}@{}",
|
|
||||||
source,
|
|
||||||
self.find_latest_matching_snapshot(source, &dest)?
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
|
||||||
let total = self.estimate(&snapshot, Some(&inc_snapshot)).ok();
|
|
||||||
if self.dryrun {
|
|
||||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
|
||||||
self.delete_snapshot(Snapshot {
|
|
||||||
snapshot: snapshot.clone(),
|
|
||||||
side: JobSide::Source,
|
|
||||||
})?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
self.send_receive(&snapshot, &dest, Some(&inc_snapshot), total)?;
|
|
||||||
} else {
|
|
||||||
self.send_event(BackupEvent::StartingFullBackup {
|
|
||||||
source: source.clone(),
|
|
||||||
dest: dest.clone(),
|
|
||||||
index: index + 1,
|
|
||||||
total: self.datasets.len(),
|
|
||||||
});
|
|
||||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
|
||||||
let total = self.estimate(&snapshot, None).ok();
|
|
||||||
if self.dryrun {
|
|
||||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
|
||||||
self.delete_snapshot(Snapshot {
|
|
||||||
snapshot: snapshot.clone(),
|
|
||||||
side: JobSide::Source,
|
|
||||||
})?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
self.send_receive(&snapshot, &dest, None, total)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up snapshots
|
#[test]
|
||||||
self.find_old_snapshots(source, &dest)?
|
fn test_parse_command_with_quotes() {
|
||||||
.into_iter()
|
assert_eq!(
|
||||||
.map(|s| self.delete_snapshot(s))
|
parse_command(r#"ssh "my host" sudo zfs"#),
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
vec!["ssh", "my host", "sudo", "zfs"]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_command_with_nested_quotes() {
|
||||||
|
assert_eq!(
|
||||||
|
parse_command(r#"ssh 'my " host' "su'do" zfs"#),
|
||||||
|
vec!["ssh", "my \" host", "su'do", "zfs"]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_command_extra_spaces() {
|
||||||
|
assert_eq!(
|
||||||
|
parse_command(" ssh \" ho st \" sudo zfs "),
|
||||||
|
vec!["ssh", " ho st ", "sudo", "zfs"]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_command_empty() {
|
||||||
|
assert_eq!(parse_command(""), Vec::<String>::new());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
13
src/main.rs
13
src/main.rs
@@ -35,9 +35,6 @@ struct Args {
|
|||||||
fn main() -> Result<(), Box<dyn Error>> {
|
fn main() -> Result<(), Box<dyn Error>> {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
let mut builder = JobBuilder::new(args.datasets, args.target);
|
let mut builder = JobBuilder::new(args.datasets, args.target);
|
||||||
if args.dry_run {
|
|
||||||
builder = builder.dryrun();
|
|
||||||
}
|
|
||||||
if let Some(cmd) = args.zfs_command {
|
if let Some(cmd) = args.zfs_command {
|
||||||
builder = builder.zfs_command(&cmd);
|
builder = builder.zfs_command(&cmd);
|
||||||
}
|
}
|
||||||
@@ -57,11 +54,15 @@ fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
} else {
|
} else {
|
||||||
Box::new(log::Progressor::new(rx))
|
Box::new(log::Progressor::new(rx))
|
||||||
};
|
};
|
||||||
thread::spawn(move || pr.run());
|
let handle = thread::spawn(move || pr.run());
|
||||||
|
|
||||||
builder = builder.sender(tx);
|
builder = builder.sender(tx);
|
||||||
|
|
||||||
|
// Create the job in a block so the sender is dropped before
|
||||||
|
// joining the progress thread, allowing it to exit cleanly.
|
||||||
|
{
|
||||||
let job = builder.build()?;
|
let job = builder.build()?;
|
||||||
job.run()?;
|
job.run(!args.dry_run)?;
|
||||||
|
}
|
||||||
|
handle.join().unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::io::{self, Read};
|
|||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
struct CountingReader<R: Read> {
|
struct ByteCountReader<R: Read> {
|
||||||
inner: R,
|
inner: R,
|
||||||
sender: Sender<BackupEvent>,
|
sender: Sender<BackupEvent>,
|
||||||
bytes: u64,
|
bytes: u64,
|
||||||
@@ -12,7 +12,7 @@ struct CountingReader<R: Read> {
|
|||||||
total: Option<u64>,
|
total: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read> Read for CountingReader<R> {
|
impl<R: Read> Read for ByteCountReader<R> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
let n = self.inner.read(buf)?;
|
let n = self.inner.read(buf)?;
|
||||||
self.bytes += n as u64;
|
self.bytes += n as u64;
|
||||||
@@ -29,31 +29,31 @@ impl<R: Read> Read for CountingReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R: Read> CountingReader<R> {
|
impl<R: Read> ByteCountReader<R> {
|
||||||
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner,
|
inner,
|
||||||
sender,
|
sender,
|
||||||
total,
|
|
||||||
bytes: 0,
|
bytes: 0,
|
||||||
last_send: Instant::now() - Duration::from_secs(1),
|
last_send: Instant::now() - Duration::from_secs(1),
|
||||||
|
total,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CountingReaderBuilder {
|
pub(crate) struct ByteCountFilter {
|
||||||
sender: Sender<BackupEvent>,
|
sender: Sender<BackupEvent>,
|
||||||
total: Option<u64>,
|
total: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Filter for CountingReaderBuilder {
|
impl ByteCountFilter {
|
||||||
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read> {
|
pub(crate) fn new(sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||||
Box::new(CountingReader::new(reader, self.sender.clone(), self.total))
|
Self { sender, total }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CountingReaderBuilder {
|
impl Filter for ByteCountFilter {
|
||||||
pub fn build(sender: Sender<BackupEvent>, total: Option<u64>) -> Box<dyn Filter> {
|
fn filter(&self, inner: Box<dyn Read>) -> Box<dyn Read> {
|
||||||
Box::new(Self { sender, total })
|
Box::new(ByteCountReader::new(inner, self.sender.clone(), self.total))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,21 +1,30 @@
|
|||||||
pub mod filter;
|
pub mod filter;
|
||||||
pub mod log;
|
pub mod log;
|
||||||
|
mod rate;
|
||||||
pub mod terminal;
|
pub mod terminal;
|
||||||
|
|
||||||
pub trait Progressor: Send {
|
pub trait Progressor: Send {
|
||||||
fn run(&mut self);
|
fn run(&mut self);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn human_bytes(bytes: u64) -> String {
|
fn human_bytes(bytes: f64) -> String {
|
||||||
let units = ["B", "kB", "MB", "GB", "TB"];
|
let units = ["B", "kB", "MB", "GB", "TB"];
|
||||||
|
|
||||||
let mut value = bytes as f64;
|
let mut a = bytes;
|
||||||
let mut index = 0;
|
let mut i = 0;
|
||||||
while value >= 1024.0 && index < units.len() - 1 {
|
while a >= 1024.0 && i < units.len() - 1 {
|
||||||
value /= 1024.0;
|
a /= 1024.0;
|
||||||
index += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
format!("{:.2} {:2}", value, units[index])
|
format!("{:.2} {:2}", a, units[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_eta(seconds: f64) -> String {
|
||||||
|
let secs = seconds as u64;
|
||||||
|
let h = secs / 3600;
|
||||||
|
let m = (secs % 3600) / 60;
|
||||||
|
let s = secs % 60;
|
||||||
|
format!("{:02}:{:02}:{:02}", h, m, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum BackupEvent {
|
pub enum BackupEvent {
|
||||||
|
|||||||
74
src/progress/rate.rs
Normal file
74
src/progress/rate.rs
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct Sample {
|
||||||
|
at: Instant,
|
||||||
|
bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct RateTracker {
|
||||||
|
samples: Vec<Sample>,
|
||||||
|
head: usize,
|
||||||
|
count: usize,
|
||||||
|
smoothed: f64,
|
||||||
|
alpha: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RateTracker {
|
||||||
|
pub(crate) fn new(capacity: usize) -> Self {
|
||||||
|
RateTracker {
|
||||||
|
samples: vec![
|
||||||
|
Sample {
|
||||||
|
at: Instant::now(),
|
||||||
|
bytes: 0,
|
||||||
|
};
|
||||||
|
capacity
|
||||||
|
],
|
||||||
|
head: 0,
|
||||||
|
count: 0,
|
||||||
|
smoothed: 0.0,
|
||||||
|
alpha: 0.05,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn push(&mut self, bytes: u64) {
|
||||||
|
let sample = Sample {
|
||||||
|
at: Instant::now(),
|
||||||
|
bytes,
|
||||||
|
};
|
||||||
|
self.samples[self.head] = sample;
|
||||||
|
self.head = (self.head + 1) % self.samples.capacity();
|
||||||
|
self.count = (self.count + 1).min(self.samples.capacity())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn rate(&mut self) -> Option<f64> {
|
||||||
|
if self.count < 2 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cap = self.samples.capacity();
|
||||||
|
|
||||||
|
let oldest_idx = (cap + self.head - self.count) % cap;
|
||||||
|
let newest_idx = (cap + self.head - 1) % cap;
|
||||||
|
|
||||||
|
let oldest = self.samples[oldest_idx];
|
||||||
|
let newest = self.samples[newest_idx];
|
||||||
|
|
||||||
|
let elapsed = newest.at.duration_since(oldest.at).as_secs_f64();
|
||||||
|
if elapsed == 0.0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If for any reason oldest < newest, the subtraction will overflow
|
||||||
|
if newest.bytes < oldest.bytes {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let rate = (newest.bytes - oldest.bytes) as f64 / elapsed;
|
||||||
|
|
||||||
|
// exponentially weighted moving average to smooth those spikes
|
||||||
|
self.smoothed = self.alpha * rate + (1.0 - self.alpha) * self.smoothed;
|
||||||
|
|
||||||
|
Some(self.smoothed)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,13 @@
|
|||||||
|
use crate::progress::format_eta;
|
||||||
|
|
||||||
|
use super::rate::RateTracker;
|
||||||
use super::{BackupEvent, human_bytes};
|
use super::{BackupEvent, human_bytes};
|
||||||
use std::{io::Write, sync::mpsc::Receiver};
|
use std::{io::Write, sync::mpsc::Receiver};
|
||||||
|
|
||||||
pub struct Progressor {
|
pub struct Progressor {
|
||||||
receiver: Receiver<BackupEvent>,
|
receiver: Receiver<BackupEvent>,
|
||||||
estimated_size: u64,
|
estimated_size: u64,
|
||||||
|
rate_tracker: RateTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl super::Progressor for Progressor {
|
impl super::Progressor for Progressor {
|
||||||
@@ -11,7 +15,7 @@ impl super::Progressor for Progressor {
|
|||||||
while let Ok(event) = self.receiver.recv() {
|
while let Ok(event) = self.receiver.recv() {
|
||||||
match event {
|
match event {
|
||||||
BackupEvent::Estimate(size) => {
|
BackupEvent::Estimate(size) => {
|
||||||
println!("Estimated total backup size: {}", human_bytes(size));
|
println!("Estimated total backup size: {}", human_bytes(size as f64));
|
||||||
self.estimated_size = size;
|
self.estimated_size = size;
|
||||||
}
|
}
|
||||||
BackupEvent::StartingFullBackup {
|
BackupEvent::StartingFullBackup {
|
||||||
@@ -53,15 +57,34 @@ impl super::Progressor for Progressor {
|
|||||||
} else {
|
} else {
|
||||||
0.0
|
0.0
|
||||||
};
|
};
|
||||||
|
self.rate_tracker.push(bytes);
|
||||||
|
let rate = self.rate_tracker.rate();
|
||||||
|
let rate_display = match rate {
|
||||||
|
None => String::from("[unknown]"),
|
||||||
|
Some(0.0) => {
|
||||||
|
String::from("[stalled] ETA heat death of the universe")
|
||||||
|
}
|
||||||
|
Some(rate) => {
|
||||||
|
let eta = if bytes > total {
|
||||||
|
String::from("any moment now")
|
||||||
|
} else {
|
||||||
|
let eta = (total - bytes) as f64 / rate;
|
||||||
|
format_eta(eta)
|
||||||
|
};
|
||||||
|
format!("{}/s ETA {}", human_bytes(rate), eta)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
print!(
|
print!(
|
||||||
"{:>3.0}% {}/{} transferred\r",
|
"\x1b[2K{:>3.0}% {}/{} @ {}\r",
|
||||||
percent * 100.0,
|
percent * 100.0,
|
||||||
human_bytes(bytes),
|
human_bytes(bytes as f64),
|
||||||
human_bytes(total)
|
human_bytes(total as f64),
|
||||||
|
rate_display,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
print!("{} transferred\r", human_bytes(bytes));
|
print!("\x1b[2K{} transferred\r", human_bytes(bytes as f64));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::io::stdout().flush().ok();
|
std::io::stdout().flush().ok();
|
||||||
@@ -82,6 +105,7 @@ impl Progressor {
|
|||||||
Self {
|
Self {
|
||||||
receiver,
|
receiver,
|
||||||
estimated_size: 0,
|
estimated_size: 0,
|
||||||
|
rate_tracker: RateTracker::new(100),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user