Add send completed when reading finishes

This commit is contained in:
2026-05-07 13:41:55 +02:00
parent e8e4e3636c
commit 137fa7d989
5 changed files with 36 additions and 15 deletions
+12 -7
View File
@@ -67,15 +67,26 @@ pub fn exec_pipe(
let send_stdout = send_process.stdout.take().unwrap(); let send_stdout = send_process.stdout.take().unwrap();
let mut receive_stdin = receive_process.stdin.take().unwrap(); let mut receive_stdin = receive_process.stdin.take().unwrap();
{
let mut reader: Box<dyn Read> = Box::new(send_stdout); let mut reader: Box<dyn Read> = Box::new(send_stdout);
for filter in filters { for filter in filters {
reader = filter.filter(reader); reader = filter.filter(reader);
} }
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
}
let receive_status = receive_process.wait().map_err(|e| e.to_string())?; // The send process will typically finish first. If it failed, terminate the
// receiver.
let send_status = send_process.wait().map_err(|e| e.to_string())?; let send_status = send_process.wait().map_err(|e| e.to_string())?;
if !send_status.success() {
receive_process.kill().ok();
return Err(format!(
"Send command {:?} failed with status {}",
source, send_status
));
}
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
if !receive_status.success() { if !receive_status.success() {
send_process.kill().ok(); send_process.kill().ok();
return Err(format!( return Err(format!(
@@ -83,11 +94,5 @@ pub fn exec_pipe(
dest, receive_status dest, receive_status
)); ));
} }
if !send_status.success() {
return Err(format!(
"Send command {:?} failed with status {}",
source, send_status
));
}
Ok(()) Ok(())
} }
+6
View File
@@ -29,6 +29,12 @@ impl<R: Read> Read for ByteCountReader<R> {
} }
} }
impl<R: Read> Drop for ByteCountReader<R> {
fn drop(&mut self) {
self.sender.send(BackupEvent::SendCompleted(self.bytes));
}
}
impl<R: Read> ByteCountReader<R> { impl<R: Read> ByteCountReader<R> {
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self { fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
Self { Self {
+3
View File
@@ -43,6 +43,9 @@ impl super::Progressor for Progressor {
println!("Deleted snapshot: {}", name); println!("Deleted snapshot: {}", name);
} }
BackupEvent::BytesTransferred { .. } => {} BackupEvent::BytesTransferred { .. } => {}
BackupEvent::SendCompleted(bytes) => {
println!("Send completed: {} bytes", bytes);
}
BackupEvent::DatasetCompleted(name) => { BackupEvent::DatasetCompleted(name) => {
println!("Completed backup of dataset: {}", name); println!("Completed backup of dataset: {}", name);
self.estimated_size = 0; self.estimated_size = 0;
+2 -1
View File
@@ -16,7 +16,7 @@ fn human_bytes(bytes: f64) -> String {
a /= 1024.0; a /= 1024.0;
i += 1; i += 1;
} }
format!("{:.2} {:2}", a, units[i]) format!("{:.2} {}", a, units[i])
} }
fn format_eta(seconds: f64) -> String { fn format_eta(seconds: f64) -> String {
@@ -47,6 +47,7 @@ pub enum BackupEvent {
bytes: u64, bytes: u64,
estimated_total: Option<u64>, estimated_total: Option<u64>,
}, },
SendCompleted(u64),
DatasetCompleted(String), DatasetCompleted(String),
DryrunCompleted(String), DryrunCompleted(String),
} }
+8 -2
View File
@@ -89,11 +89,17 @@ impl super::Progressor for Progressor {
} }
std::io::stdout().flush().ok(); std::io::stdout().flush().ok();
} }
BackupEvent::SendCompleted(bytes) => {
println!(
"\x1b[2KSend completed at {}, awaiting receive completion",
human_bytes(bytes as f64)
);
}
BackupEvent::DatasetCompleted(name) => { BackupEvent::DatasetCompleted(name) => {
println!("Completed backup of dataset: {}", name); println!("\x1b[2KCompleted backup of dataset: {}", name);
} }
BackupEvent::DryrunCompleted(name) => { BackupEvent::DryrunCompleted(name) => {
println!("Completed dry run backup of dataset: {}", name); println!("\x1b[2KCompleted dry run backup of dataset: {}", name);
} }
} }
} }