Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e8e4e3636c | |||
| 2660d7d2f5 | |||
| 9ef27716c6 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -435,7 +435,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zfsbackup"
|
||||
version = "0.3.0"
|
||||
version = "0.4.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "zfsbackup"
|
||||
version = "0.3.0"
|
||||
version = "0.4.0"
|
||||
edition = "2024"
|
||||
publish = false
|
||||
|
||||
|
||||
@@ -6,7 +6,12 @@ pub trait Filter {
|
||||
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
||||
}
|
||||
|
||||
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||
// TODO: Make this support quoting for args with spaces (and escape nested quotes)
|
||||
fn display_command(command: &Vec<&str>) -> String {
|
||||
command.join(" ")
|
||||
}
|
||||
|
||||
pub fn exec(command: &Vec<&str>) -> Result<String, String> {
|
||||
if command.is_empty() {
|
||||
return Err("Command is empty".to_string());
|
||||
}
|
||||
@@ -14,11 +19,13 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||
if command.len() > 1 {
|
||||
cmd.args(&command[1..]);
|
||||
}
|
||||
let output = cmd.output().map_err(|e| e.to_string())?;
|
||||
let output = cmd
|
||||
.output()
|
||||
.map_err(|e| format!("Failed to run command {}: {}", command[0], e))?;
|
||||
if !output.status.success() {
|
||||
return Err(format!(
|
||||
"Command {:?} failed with status {}: {}",
|
||||
command,
|
||||
"Command '{}' failed with status {}: {}",
|
||||
display_command(command),
|
||||
output.status,
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
));
|
||||
@@ -27,7 +34,10 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||
Ok(output_str)
|
||||
}
|
||||
|
||||
pub fn exec_piped_commands(
|
||||
/// Executes a pipeline of commands, where the output of the source command is passed through a
|
||||
/// series of filters before being piped into the destination command. The filters are applied in
|
||||
/// the order they are provided.
|
||||
pub fn exec_pipe(
|
||||
source: &Vec<&str>,
|
||||
dest: &Vec<&str>,
|
||||
filters: Vec<Box<dyn Filter>>,
|
||||
|
||||
16
src/job.rs
16
src/job.rs
@@ -57,7 +57,7 @@ impl Job {
|
||||
let name = snapshot.to_string();
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["snapshot", &name]);
|
||||
command::exec_command(&cmd)?;
|
||||
command::exec(&cmd)?;
|
||||
self.send_event(BackupEvent::SnapshotCreated(name.clone()));
|
||||
Ok(snapshot)
|
||||
}
|
||||
@@ -70,7 +70,7 @@ impl Job {
|
||||
}
|
||||
cmd.extend(["-n", "-P"]);
|
||||
cmd.push(source);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let output = command::exec(&cmd)?;
|
||||
let size = output
|
||||
.lines()
|
||||
.last()
|
||||
@@ -107,7 +107,7 @@ impl Job {
|
||||
))],
|
||||
None => vec![],
|
||||
};
|
||||
command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?;
|
||||
command::exec_pipe(&send_cmd, &receive_cmd, filters)?;
|
||||
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
||||
Ok(())
|
||||
}
|
||||
@@ -115,7 +115,7 @@ impl Job {
|
||||
fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let output = command::exec(&cmd)?;
|
||||
let snapshots: Vec<&str> = output
|
||||
.split_whitespace()
|
||||
.map(|s| {
|
||||
@@ -195,7 +195,7 @@ impl Job {
|
||||
let mut cmd = self.get_side_command(snapshot.side);
|
||||
let name = snapshot.to_string();
|
||||
cmd.extend(["destroy", &name]);
|
||||
command::exec_command(&cmd)?;
|
||||
command::exec(&cmd)?;
|
||||
self.send_event(BackupEvent::SnapshotDeleted(name.clone()));
|
||||
Ok(())
|
||||
}
|
||||
@@ -205,7 +205,7 @@ impl Job {
|
||||
// Check the source exists
|
||||
let mut cmd = self.get_side_command(JobSide::Source);
|
||||
cmd.extend(["list", "-H", "-o", "name", source]);
|
||||
let _ = command::exec_command(&cmd)?;
|
||||
let _ = command::exec(&cmd)?;
|
||||
|
||||
// Check whether the destination exists
|
||||
// TODO: This will assume the destination doesn't exist if the
|
||||
@@ -213,7 +213,7 @@ impl Job {
|
||||
let dest = format!("{}/{}", self.target, source);
|
||||
cmd = self.get_side_command(JobSide::Target);
|
||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
||||
let dest_exists = command::exec(&cmd).is_ok();
|
||||
|
||||
// Run backup
|
||||
if dest_exists {
|
||||
@@ -375,7 +375,7 @@ impl JobBuilder {
|
||||
|
||||
let mut cmd: Vec<&str> = self.source_zfs_command.iter().map(|s| s.as_str()).collect();
|
||||
cmd.extend(args);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let output = command::exec(&cmd)?;
|
||||
datasets.extend(output.lines().map(str::to_string));
|
||||
}
|
||||
if datasets.is_empty() {
|
||||
|
||||
@@ -1,21 +1,30 @@
|
||||
pub mod filter;
|
||||
pub mod log;
|
||||
mod rate;
|
||||
pub mod terminal;
|
||||
|
||||
pub trait Progressor: Send {
|
||||
fn run(&mut self);
|
||||
}
|
||||
|
||||
fn human_bytes(bytes: u64) -> String {
|
||||
fn human_bytes(bytes: f64) -> String {
|
||||
let units = ["B", "kB", "MB", "GB", "TB"];
|
||||
|
||||
let mut value = bytes as f64;
|
||||
let mut index = 0;
|
||||
while value >= 1024.0 && index < units.len() - 1 {
|
||||
value /= 1024.0;
|
||||
index += 1;
|
||||
let mut a = bytes;
|
||||
let mut i = 0;
|
||||
while a >= 1024.0 && i < units.len() - 1 {
|
||||
a /= 1024.0;
|
||||
i += 1;
|
||||
}
|
||||
format!("{:.2} {:2}", value, units[index])
|
||||
format!("{:.2} {:2}", a, units[i])
|
||||
}
|
||||
|
||||
fn format_eta(seconds: f64) -> String {
|
||||
let secs = seconds as u64;
|
||||
let h = secs / 3600;
|
||||
let m = (secs % 3600) / 60;
|
||||
let s = secs % 60;
|
||||
format!("{:02}:{:02}:{:02}", h, m, s)
|
||||
}
|
||||
|
||||
pub enum BackupEvent {
|
||||
|
||||
74
src/progress/rate.rs
Normal file
74
src/progress/rate.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct Sample {
|
||||
at: Instant,
|
||||
bytes: u64,
|
||||
}
|
||||
|
||||
pub(crate) struct RateTracker {
|
||||
samples: Vec<Sample>,
|
||||
head: usize,
|
||||
count: usize,
|
||||
smoothed: f64,
|
||||
alpha: f64,
|
||||
}
|
||||
|
||||
impl RateTracker {
|
||||
pub(crate) fn new(capacity: usize) -> Self {
|
||||
RateTracker {
|
||||
samples: vec![
|
||||
Sample {
|
||||
at: Instant::now(),
|
||||
bytes: 0,
|
||||
};
|
||||
capacity
|
||||
],
|
||||
head: 0,
|
||||
count: 0,
|
||||
smoothed: 0.0,
|
||||
alpha: 0.05,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push(&mut self, bytes: u64) {
|
||||
let sample = Sample {
|
||||
at: Instant::now(),
|
||||
bytes,
|
||||
};
|
||||
self.samples[self.head] = sample;
|
||||
self.head = (self.head + 1) % self.samples.capacity();
|
||||
self.count = (self.count + 1).min(self.samples.capacity())
|
||||
}
|
||||
|
||||
pub(crate) fn rate(&mut self) -> Option<f64> {
|
||||
if self.count < 2 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let cap = self.samples.capacity();
|
||||
|
||||
let oldest_idx = (cap + self.head - self.count) % cap;
|
||||
let newest_idx = (cap + self.head - 1) % cap;
|
||||
|
||||
let oldest = self.samples[oldest_idx];
|
||||
let newest = self.samples[newest_idx];
|
||||
|
||||
let elapsed = newest.at.duration_since(oldest.at).as_secs_f64();
|
||||
if elapsed == 0.0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If for any reason oldest < newest, the subtraction will overflow
|
||||
if newest.bytes < oldest.bytes {
|
||||
return None;
|
||||
}
|
||||
|
||||
let rate = (newest.bytes - oldest.bytes) as f64 / elapsed;
|
||||
|
||||
// exponentially weighted moving average to smooth those spikes
|
||||
self.smoothed = self.alpha * rate + (1.0 - self.alpha) * self.smoothed;
|
||||
|
||||
Some(self.smoothed)
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,13 @@
|
||||
use crate::progress::format_eta;
|
||||
|
||||
use super::rate::RateTracker;
|
||||
use super::{BackupEvent, human_bytes};
|
||||
use std::{io::Write, sync::mpsc::Receiver};
|
||||
|
||||
pub struct Progressor {
|
||||
receiver: Receiver<BackupEvent>,
|
||||
estimated_size: u64,
|
||||
rate_tracker: RateTracker,
|
||||
}
|
||||
|
||||
impl super::Progressor for Progressor {
|
||||
@@ -11,7 +15,7 @@ impl super::Progressor for Progressor {
|
||||
while let Ok(event) = self.receiver.recv() {
|
||||
match event {
|
||||
BackupEvent::Estimate(size) => {
|
||||
println!("Estimated total backup size: {}", human_bytes(size));
|
||||
println!("Estimated total backup size: {}", human_bytes(size as f64));
|
||||
self.estimated_size = size;
|
||||
}
|
||||
BackupEvent::StartingFullBackup {
|
||||
@@ -53,15 +57,34 @@ impl super::Progressor for Progressor {
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
self.rate_tracker.push(bytes);
|
||||
let rate = self.rate_tracker.rate();
|
||||
let rate_display = match rate {
|
||||
None => String::from("[unknown]"),
|
||||
Some(0.0) => {
|
||||
String::from("[stalled] ETA heat death of the universe")
|
||||
}
|
||||
Some(rate) => {
|
||||
let eta = if bytes > total {
|
||||
String::from("any moment now")
|
||||
} else {
|
||||
let eta = (total - bytes) as f64 / rate;
|
||||
format_eta(eta)
|
||||
};
|
||||
format!("{}/s ETA {}", human_bytes(rate), eta)
|
||||
}
|
||||
};
|
||||
|
||||
print!(
|
||||
"{:>3.0}% {}/{} transferred\r",
|
||||
"\x1b[2K{:>3.0}% {}/{} @ {}\r",
|
||||
percent * 100.0,
|
||||
human_bytes(bytes),
|
||||
human_bytes(total)
|
||||
human_bytes(bytes as f64),
|
||||
human_bytes(total as f64),
|
||||
rate_display,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
print!("{} transferred\r", human_bytes(bytes));
|
||||
print!("\x1b[2K{} transferred\r", human_bytes(bytes as f64));
|
||||
}
|
||||
}
|
||||
std::io::stdout().flush().ok();
|
||||
@@ -82,6 +105,7 @@ impl Progressor {
|
||||
Self {
|
||||
receiver,
|
||||
estimated_size: 0,
|
||||
rate_tracker: RateTracker::new(100),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user