Add pluggable progressors

This commit is contained in:
2026-04-26 14:51:22 +02:00
parent c2ccccc88f
commit ce81e266b7
6 changed files with 182 additions and 61 deletions

View File

@@ -28,19 +28,20 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
struct CountingReader<R: Read> { struct CountingReader<R: Read> {
inner: R, inner: R,
sender: Sender<BackupEvent>, sender: Sender<BackupEvent>,
bytes_read: u64, bytes: u64,
last_send: Instant, last_send: Instant,
total: Option<u64>,
} }
impl<R: Read> Read for CountingReader<R> { impl<R: Read> Read for CountingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?; let n = self.inner.read(buf)?;
self.bytes_read += n as u64; self.bytes += n as u64;
if self.last_send.elapsed().as_millis() >= 100 { if self.last_send.elapsed().as_millis() >= 100 {
self.sender self.sender
.send(BackupEvent::BytesTransferred { .send(BackupEvent::BytesTransferred {
dataset: String::from(""), bytes: self.bytes,
bytes: self.bytes_read, estimated_total: self.total,
}) })
.ok(); .ok();
self.last_send = Instant::now(); self.last_send = Instant::now();
@@ -50,11 +51,12 @@ impl<R: Read> Read for CountingReader<R> {
} }
impl<R: Read> CountingReader<R> { impl<R: Read> CountingReader<R> {
fn new(inner: R, sender: Sender<BackupEvent>) -> Self { fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
Self { Self {
inner, inner,
sender, sender,
bytes_read: 0, total,
bytes: 0,
last_send: Instant::now() - Duration::from_secs(1), last_send: Instant::now() - Duration::from_secs(1),
} }
} }
@@ -64,6 +66,7 @@ pub fn exec_piped_commands(
source: &Vec<&str>, source: &Vec<&str>,
dest: &Vec<&str>, dest: &Vec<&str>,
sender: Option<Sender<BackupEvent>>, sender: Option<Sender<BackupEvent>>,
total: Option<u64>,
) -> Result<(), String> { ) -> Result<(), String> {
if source.is_empty() || dest.is_empty() { if source.is_empty() || dest.is_empty() {
return Err("Source or destination command is empty".to_string()); return Err("Source or destination command is empty".to_string());
@@ -91,7 +94,7 @@ pub fn exec_piped_commands(
let mut receive_stdin = receive_process.stdin.take().unwrap(); let mut receive_stdin = receive_process.stdin.take().unwrap();
let mut reader: Box<dyn Read> = match sender { let mut reader: Box<dyn Read> = match sender {
Some(s) => Box::new(CountingReader::new(send_stdout, s)), Some(s) => Box::new(CountingReader::new(send_stdout, s, total)),
None => Box::new(send_stdout), None => Box::new(send_stdout),
}; };
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;

View File

@@ -181,6 +181,7 @@ impl Job {
source: &str, source: &str,
dest: &str, dest: &str,
inc_snapshot: Option<&str>, inc_snapshot: Option<&str>,
total: Option<u64>,
) -> Result<(), String> { ) -> Result<(), String> {
let mut send_cmd = self.get_side_command(JobSide::Source); let mut send_cmd = self.get_side_command(JobSide::Source);
send_cmd.push("send"); send_cmd.push("send");
@@ -191,7 +192,7 @@ impl Job {
let mut receive_cmd = self.get_side_command(JobSide::Destination); let mut receive_cmd = self.get_side_command(JobSide::Destination);
receive_cmd.extend(["receive", "-F", dest]); receive_cmd.extend(["receive", "-F", dest]);
command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone())?; command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone(), total)?;
self.send_event(BackupEvent::DatasetCompleted(source.to_string())); self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
Ok(()) Ok(())
} }
@@ -313,8 +314,16 @@ impl Job {
); );
let snapshot = self.create_snapshot(source, JobSide::Source)?; let snapshot = self.create_snapshot(source, JobSide::Source)?;
self.estimate(&snapshot, Some(&inc_snapshot)).ok(); let total = self.estimate(&snapshot, Some(&inc_snapshot)).ok();
self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?; if self.dryrun {
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
self.delete_snapshot(Snapshot {
snapshot: snapshot.clone(),
side: JobSide::Source,
})?;
return Ok(());
}
self.send_receive(&snapshot, &dest, Some(&inc_snapshot), total)?;
} else { } else {
self.send_event(BackupEvent::StartingFullBackup { self.send_event(BackupEvent::StartingFullBackup {
source: source.clone(), source: source.clone(),
@@ -323,8 +332,16 @@ impl Job {
total: self.datasets.len(), total: self.datasets.len(),
}); });
let snapshot = self.create_snapshot(source, JobSide::Source)?; let snapshot = self.create_snapshot(source, JobSide::Source)?;
self.estimate(&snapshot, None).ok(); let total = self.estimate(&snapshot, None).ok();
self.send_receive(&snapshot, &dest, None)?; if self.dryrun {
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
self.delete_snapshot(Snapshot {
snapshot: snapshot.clone(),
side: JobSide::Source,
})?;
return Ok(());
}
self.send_receive(&snapshot, &dest, None, total)?;
} }
// Clean up snapshots // Clean up snapshots

View File

@@ -4,7 +4,8 @@ use std::error::Error;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread; use std::thread;
use zfsbackup::job::JobBuilder; use zfsbackup::job::JobBuilder;
use zfsbackup::progress::ProgressReporter; use zfsbackup::progress::Progressor;
use zfsbackup::progress::terminal;
#[derive(Parser)] #[derive(Parser)]
#[command(version, about, long_about = None)] #[command(version, about, long_about = None)]
@@ -50,7 +51,7 @@ fn main() -> Result<(), Box<dyn Error>> {
} }
let (tx, rx) = channel(); let (tx, rx) = channel();
let mut pr = ProgressReporter::new(rx); let mut pr: Box<dyn Progressor> = Box::new(terminal::Progressor::new(rx));
thread::spawn(move || pr.run()); thread::spawn(move || pr.run());
builder = builder.sender(tx); builder = builder.sender(tx);

66
src/progress/log.rs Normal file
View File

@@ -0,0 +1,66 @@
use super::BackupEvent;
use std::sync::mpsc::Receiver;
pub struct Progressor {
receiver: Receiver<BackupEvent>,
estimated_size: u64,
}
impl super::Progressor for Progressor {
fn run(&mut self) {
while let Ok(event) = self.receiver.recv() {
match event {
BackupEvent::Estimate(size) => {
println!("Estimated total backup size: {} bytes", size);
self.estimated_size = size;
}
BackupEvent::StartingFullBackup {
source,
dest,
index,
total,
} => {
println!(
"Starting full backup of {} to {} ({} of {})",
source, dest, index, total
);
}
BackupEvent::StartingIncrementalBackup {
source,
dest,
index,
total,
} => {
println!(
"Starting incremental backup of {} to {} ({} of {})",
source, dest, index, total
);
}
BackupEvent::SnapshotCreated(name) => {
println!("Created snapshot: {}", name);
}
BackupEvent::SnapshotDeleted(name) => {
println!("Deleted snapshot: {}", name);
}
BackupEvent::BytesTransferred { .. } => {}
BackupEvent::DatasetCompleted(name) => {
println!("Completed backup of dataset: {}", name);
self.estimated_size = 0;
}
BackupEvent::DryrunCompleted(name) => {
println!("Completed dry run backup of dataset: {}", name);
self.estimated_size = 0;
}
}
}
}
}
impl Progressor {
pub fn new(receiver: Receiver<BackupEvent>) -> Self {
Self {
receiver,
estimated_size: 0,
}
}
}

42
src/progress/mod.rs Normal file
View File

@@ -0,0 +1,42 @@
pub mod log;
pub mod terminal;
pub trait Progressor: Send {
fn run(&mut self);
}
fn human_bytes(bytes: u64) -> String {
let units = ["B", "kB", "MB", "GB", "TB"];
let mut value = bytes as f64;
let mut index = 0;
while value >= 1024.0 && index < units.len() - 1 {
value /= 1024.0;
index += 1;
}
format!("{:.2} {:2}", value, units[index])
}
pub enum BackupEvent {
Estimate(u64),
StartingFullBackup {
source: String,
dest: String,
index: usize,
total: usize,
},
StartingIncrementalBackup {
source: String,
dest: String,
index: usize,
total: usize,
},
SnapshotCreated(String),
SnapshotDeleted(String),
BytesTransferred {
bytes: u64,
estimated_total: Option<u64>,
},
DatasetCompleted(String),
DryrunCompleted(String),
}

View File

@@ -1,46 +1,17 @@
use super::{BackupEvent, human_bytes};
use std::{io::Write, sync::mpsc::Receiver}; use std::{io::Write, sync::mpsc::Receiver};
pub enum BackupEvent { pub struct Progressor {
Estimate(u64),
StartingFullBackup {
source: String,
dest: String,
index: usize,
total: usize,
},
StartingIncrementalBackup {
source: String,
dest: String,
index: usize,
total: usize,
},
SnapshotCreated(String),
SnapshotDeleted(String),
BytesTransferred {
dataset: String,
bytes: u64,
},
DatasetCompleted(String),
}
pub struct ProgressReporter {
receiver: Receiver<BackupEvent>, receiver: Receiver<BackupEvent>,
estimated_size: u64, estimated_size: u64,
} }
impl ProgressReporter { impl super::Progressor for Progressor {
pub fn new(receiver: Receiver<BackupEvent>) -> Self { fn run(&mut self) {
Self {
receiver,
estimated_size: 0,
}
}
pub fn run(&mut self) {
while let Ok(event) = self.receiver.recv() { while let Ok(event) = self.receiver.recv() {
match event { match event {
BackupEvent::Estimate(size) => { BackupEvent::Estimate(size) => {
println!("Estimated total backup size: {} bytes", size); println!("Estimated total backup size: {}", human_bytes(size));
self.estimated_size = size; self.estimated_size = size;
} }
BackupEvent::StartingFullBackup { BackupEvent::StartingFullBackup {
@@ -71,25 +42,46 @@ impl ProgressReporter {
BackupEvent::SnapshotDeleted(name) => { BackupEvent::SnapshotDeleted(name) => {
println!("Deleted snapshot: {}", name); println!("Deleted snapshot: {}", name);
} }
BackupEvent::BytesTransferred { dataset, bytes } => { BackupEvent::BytesTransferred {
let percent: f64 = if self.estimated_size > 0 { bytes,
bytes as f64 / self.estimated_size as f64 estimated_total,
} => {
match estimated_total {
Some(total) => {
let percent: f64 = if total > 0 {
bytes as f64 / total as f64
} else { } else {
0.0 0.0
}; };
print!( print!(
"{:>3.0}% {}/{} bytes transferred\r", "{:>3.0}% {}/{} transferred\r",
percent * 100.0, percent * 100.0,
bytes, human_bytes(bytes),
self.estimated_size human_bytes(total)
); );
}
None => {
print!("{} transferred\r", human_bytes(bytes));
}
}
std::io::stdout().flush().ok(); std::io::stdout().flush().ok();
} }
BackupEvent::DatasetCompleted(name) => { BackupEvent::DatasetCompleted(name) => {
println!("Completed backup of dataset: {}", name); println!("Completed backup of dataset: {}", name);
self.estimated_size = 0; }
BackupEvent::DryrunCompleted(name) => {
println!("Completed dry run backup of dataset: {}", name);
} }
} }
} }
} }
} }
impl Progressor {
pub fn new(receiver: Receiver<BackupEvent>) -> Self {
Self {
receiver,
estimated_size: 0,
}
}
}