diff --git a/src/command.rs b/src/command.rs index 9e3c827..d02f17f 100644 --- a/src/command.rs +++ b/src/command.rs @@ -28,19 +28,20 @@ pub fn exec_command(command: &Vec<&str>) -> Result { struct CountingReader { inner: R, sender: Sender, - bytes_read: u64, + bytes: u64, last_send: Instant, + total: Option, } impl Read for CountingReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { let n = self.inner.read(buf)?; - self.bytes_read += n as u64; + self.bytes += n as u64; if self.last_send.elapsed().as_millis() >= 100 { self.sender .send(BackupEvent::BytesTransferred { - dataset: String::from(""), - bytes: self.bytes_read, + bytes: self.bytes, + estimated_total: self.total, }) .ok(); self.last_send = Instant::now(); @@ -50,11 +51,12 @@ impl Read for CountingReader { } impl CountingReader { - fn new(inner: R, sender: Sender) -> Self { + fn new(inner: R, sender: Sender, total: Option) -> Self { Self { inner, sender, - bytes_read: 0, + total, + bytes: 0, last_send: Instant::now() - Duration::from_secs(1), } } @@ -64,6 +66,7 @@ pub fn exec_piped_commands( source: &Vec<&str>, dest: &Vec<&str>, sender: Option>, + total: Option, ) -> Result<(), String> { if source.is_empty() || dest.is_empty() { return Err("Source or destination command is empty".to_string()); @@ -91,7 +94,7 @@ pub fn exec_piped_commands( let mut receive_stdin = receive_process.stdin.take().unwrap(); let mut reader: Box = match sender { - Some(s) => Box::new(CountingReader::new(send_stdout, s)), + Some(s) => Box::new(CountingReader::new(send_stdout, s, total)), None => Box::new(send_stdout), }; std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; diff --git a/src/job.rs b/src/job.rs index 56b3e95..dcb4475 100644 --- a/src/job.rs +++ b/src/job.rs @@ -181,6 +181,7 @@ impl Job { source: &str, dest: &str, inc_snapshot: Option<&str>, + total: Option, ) -> Result<(), String> { let mut send_cmd = self.get_side_command(JobSide::Source); send_cmd.push("send"); @@ -191,7 +192,7 @@ impl Job { let mut receive_cmd = self.get_side_command(JobSide::Destination); receive_cmd.extend(["receive", "-F", dest]); - command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone())?; + command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone(), total)?; self.send_event(BackupEvent::DatasetCompleted(source.to_string())); Ok(()) } @@ -313,8 +314,16 @@ impl Job { ); let snapshot = self.create_snapshot(source, JobSide::Source)?; - self.estimate(&snapshot, Some(&inc_snapshot)).ok(); - self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?; + let total = self.estimate(&snapshot, Some(&inc_snapshot)).ok(); + if self.dryrun { + self.send_event(BackupEvent::DryrunCompleted(source.clone())); + self.delete_snapshot(Snapshot { + snapshot: snapshot.clone(), + side: JobSide::Source, + })?; + return Ok(()); + } + self.send_receive(&snapshot, &dest, Some(&inc_snapshot), total)?; } else { self.send_event(BackupEvent::StartingFullBackup { source: source.clone(), @@ -323,8 +332,16 @@ impl Job { total: self.datasets.len(), }); let snapshot = self.create_snapshot(source, JobSide::Source)?; - self.estimate(&snapshot, None).ok(); - self.send_receive(&snapshot, &dest, None)?; + let total = self.estimate(&snapshot, None).ok(); + if self.dryrun { + self.send_event(BackupEvent::DryrunCompleted(source.clone())); + self.delete_snapshot(Snapshot { + snapshot: snapshot.clone(), + side: JobSide::Source, + })?; + return Ok(()); + } + self.send_receive(&snapshot, &dest, None, total)?; } // Clean up snapshots diff --git a/src/main.rs b/src/main.rs index 8461e79..ec2462f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,8 @@ use std::error::Error; use std::sync::mpsc::channel; use std::thread; use zfsbackup::job::JobBuilder; -use zfsbackup::progress::ProgressReporter; +use zfsbackup::progress::Progressor; +use zfsbackup::progress::terminal; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -50,7 +51,7 @@ fn main() -> Result<(), Box> { } let (tx, rx) = channel(); - let mut pr = ProgressReporter::new(rx); + let mut pr: Box = Box::new(terminal::Progressor::new(rx)); thread::spawn(move || pr.run()); builder = builder.sender(tx); diff --git a/src/progress/log.rs b/src/progress/log.rs new file mode 100644 index 0000000..8cda8dc --- /dev/null +++ b/src/progress/log.rs @@ -0,0 +1,66 @@ +use super::BackupEvent; +use std::sync::mpsc::Receiver; + +pub struct Progressor { + receiver: Receiver, + estimated_size: u64, +} + +impl super::Progressor for Progressor { + fn run(&mut self) { + while let Ok(event) = self.receiver.recv() { + match event { + BackupEvent::Estimate(size) => { + println!("Estimated total backup size: {} bytes", size); + self.estimated_size = size; + } + BackupEvent::StartingFullBackup { + source, + dest, + index, + total, + } => { + println!( + "Starting full backup of {} to {} ({} of {})", + source, dest, index, total + ); + } + BackupEvent::StartingIncrementalBackup { + source, + dest, + index, + total, + } => { + println!( + "Starting incremental backup of {} to {} ({} of {})", + source, dest, index, total + ); + } + BackupEvent::SnapshotCreated(name) => { + println!("Created snapshot: {}", name); + } + BackupEvent::SnapshotDeleted(name) => { + println!("Deleted snapshot: {}", name); + } + BackupEvent::BytesTransferred { .. } => {} + BackupEvent::DatasetCompleted(name) => { + println!("Completed backup of dataset: {}", name); + self.estimated_size = 0; + } + BackupEvent::DryrunCompleted(name) => { + println!("Completed dry run backup of dataset: {}", name); + self.estimated_size = 0; + } + } + } + } +} + +impl Progressor { + pub fn new(receiver: Receiver) -> Self { + Self { + receiver, + estimated_size: 0, + } + } +} diff --git a/src/progress/mod.rs b/src/progress/mod.rs new file mode 100644 index 0000000..dd4aef1 --- /dev/null +++ b/src/progress/mod.rs @@ -0,0 +1,42 @@ +pub mod log; +pub mod terminal; + +pub trait Progressor: Send { + fn run(&mut self); +} + +fn human_bytes(bytes: u64) -> String { + let units = ["B", "kB", "MB", "GB", "TB"]; + + let mut value = bytes as f64; + let mut index = 0; + while value >= 1024.0 && index < units.len() - 1 { + value /= 1024.0; + index += 1; + } + format!("{:.2} {:2}", value, units[index]) +} + +pub enum BackupEvent { + Estimate(u64), + StartingFullBackup { + source: String, + dest: String, + index: usize, + total: usize, + }, + StartingIncrementalBackup { + source: String, + dest: String, + index: usize, + total: usize, + }, + SnapshotCreated(String), + SnapshotDeleted(String), + BytesTransferred { + bytes: u64, + estimated_total: Option, + }, + DatasetCompleted(String), + DryrunCompleted(String), +} diff --git a/src/progress.rs b/src/progress/terminal.rs similarity index 57% rename from src/progress.rs rename to src/progress/terminal.rs index f85c054..2930559 100644 --- a/src/progress.rs +++ b/src/progress/terminal.rs @@ -1,46 +1,17 @@ +use super::{BackupEvent, human_bytes}; use std::{io::Write, sync::mpsc::Receiver}; -pub enum BackupEvent { - Estimate(u64), - StartingFullBackup { - source: String, - dest: String, - index: usize, - total: usize, - }, - StartingIncrementalBackup { - source: String, - dest: String, - index: usize, - total: usize, - }, - SnapshotCreated(String), - SnapshotDeleted(String), - BytesTransferred { - dataset: String, - bytes: u64, - }, - DatasetCompleted(String), -} - -pub struct ProgressReporter { +pub struct Progressor { receiver: Receiver, estimated_size: u64, } -impl ProgressReporter { - pub fn new(receiver: Receiver) -> Self { - Self { - receiver, - estimated_size: 0, - } - } - - pub fn run(&mut self) { +impl super::Progressor for Progressor { + fn run(&mut self) { while let Ok(event) = self.receiver.recv() { match event { BackupEvent::Estimate(size) => { - println!("Estimated total backup size: {} bytes", size); + println!("Estimated total backup size: {}", human_bytes(size)); self.estimated_size = size; } BackupEvent::StartingFullBackup { @@ -71,25 +42,46 @@ impl ProgressReporter { BackupEvent::SnapshotDeleted(name) => { println!("Deleted snapshot: {}", name); } - BackupEvent::BytesTransferred { dataset, bytes } => { - let percent: f64 = if self.estimated_size > 0 { - bytes as f64 / self.estimated_size as f64 - } else { - 0.0 - }; - print!( - "{:>3.0}% {}/{} bytes transferred\r", - percent * 100.0, - bytes, - self.estimated_size - ); + BackupEvent::BytesTransferred { + bytes, + estimated_total, + } => { + match estimated_total { + Some(total) => { + let percent: f64 = if total > 0 { + bytes as f64 / total as f64 + } else { + 0.0 + }; + print!( + "{:>3.0}% {}/{} transferred\r", + percent * 100.0, + human_bytes(bytes), + human_bytes(total) + ); + } + None => { + print!("{} transferred\r", human_bytes(bytes)); + } + } std::io::stdout().flush().ok(); } BackupEvent::DatasetCompleted(name) => { println!("Completed backup of dataset: {}", name); - self.estimated_size = 0; + } + BackupEvent::DryrunCompleted(name) => { + println!("Completed dry run backup of dataset: {}", name); } } } } } + +impl Progressor { + pub fn new(receiver: Receiver) -> Self { + Self { + receiver, + estimated_size: 0, + } + } +}