From a0585764cfd514e339affb3677005b614551cb6d Mon Sep 17 00:00:00 2001 From: James McDonald Date: Thu, 7 May 2026 13:41:55 +0200 Subject: [PATCH] Add send completed when reading finishes --- src/command.rs | 29 +++++++++++++++++------------ src/progress/filter.rs | 8 ++++++++ src/progress/log.rs | 3 +++ src/progress/mod.rs | 3 ++- src/progress/terminal.rs | 10 ++++++++-- 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/command.rs b/src/command.rs index 44985e1..18f377e 100644 --- a/src/command.rs +++ b/src/command.rs @@ -67,15 +67,26 @@ pub fn exec_pipe( let send_stdout = send_process.stdout.take().unwrap(); let mut receive_stdin = receive_process.stdin.take().unwrap(); - let mut reader: Box = Box::new(send_stdout); - for filter in filters { - reader = filter.filter(reader); + { + let mut reader: Box = Box::new(send_stdout); + for filter in filters { + reader = filter.filter(reader); + } + + std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; } - std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; - - let receive_status = receive_process.wait().map_err(|e| e.to_string())?; + // The send process will typically finish first. If it failed, terminate the + // receiver. let send_status = send_process.wait().map_err(|e| e.to_string())?; + if !send_status.success() { + receive_process.kill().ok(); + return Err(format!( + "Send command {:?} failed with status {}", + source, send_status + )); + } + let receive_status = receive_process.wait().map_err(|e| e.to_string())?; if !receive_status.success() { send_process.kill().ok(); return Err(format!( @@ -83,11 +94,5 @@ pub fn exec_pipe( dest, receive_status )); } - if !send_status.success() { - return Err(format!( - "Send command {:?} failed with status {}", - source, send_status - )); - } Ok(()) } diff --git a/src/progress/filter.rs b/src/progress/filter.rs index 311f984..f567de5 100644 --- a/src/progress/filter.rs +++ b/src/progress/filter.rs @@ -29,6 +29,14 @@ impl Read for ByteCountReader { } } +impl Drop for ByteCountReader { + fn drop(&mut self) { + self.sender + .send(BackupEvent::SendCompleted(self.bytes)) + .ok(); + } +} + impl ByteCountReader { fn new(inner: R, sender: Sender, total: Option) -> Self { Self { diff --git a/src/progress/log.rs b/src/progress/log.rs index 8cda8dc..7de913d 100644 --- a/src/progress/log.rs +++ b/src/progress/log.rs @@ -43,6 +43,9 @@ impl super::Progressor for Progressor { println!("Deleted snapshot: {}", name); } BackupEvent::BytesTransferred { .. } => {} + BackupEvent::SendCompleted(bytes) => { + println!("Send completed: {} bytes", bytes); + } BackupEvent::DatasetCompleted(name) => { println!("Completed backup of dataset: {}", name); self.estimated_size = 0; diff --git a/src/progress/mod.rs b/src/progress/mod.rs index 719113b..38a8cd6 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -16,7 +16,7 @@ fn human_bytes(bytes: f64) -> String { a /= 1024.0; i += 1; } - format!("{:.2} {:2}", a, units[i]) + format!("{:.2} {}", a, units[i]) } fn format_eta(seconds: f64) -> String { @@ -47,6 +47,7 @@ pub enum BackupEvent { bytes: u64, estimated_total: Option, }, + SendCompleted(u64), DatasetCompleted(String), DryrunCompleted(String), } diff --git a/src/progress/terminal.rs b/src/progress/terminal.rs index a8b16db..669761a 100644 --- a/src/progress/terminal.rs +++ b/src/progress/terminal.rs @@ -89,11 +89,17 @@ impl super::Progressor for Progressor { } std::io::stdout().flush().ok(); } + BackupEvent::SendCompleted(bytes) => { + println!( + "\x1b[2KSend completed at {}, awaiting receive completion", + human_bytes(bytes as f64) + ); + } BackupEvent::DatasetCompleted(name) => { - println!("Completed backup of dataset: {}", name); + println!("\x1b[2KCompleted backup of dataset: {}", name); } BackupEvent::DryrunCompleted(name) => { - println!("Completed dry run backup of dataset: {}", name); + println!("\x1b[2KCompleted dry run backup of dataset: {}", name); } } }