diff --git a/src/command.rs b/src/command.rs index d02f17f..0ed0364 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,8 +1,10 @@ -use crate::progress::BackupEvent; -use std::io; use std::io::Read; -use std::sync::mpsc::Sender; -use std::time::{Duration, Instant}; + +/// A trait for filters that can be applied to the output of a command before +/// it is piped to another command. +pub trait Filter { + fn filter(&self, reader: Box) -> Box; +} pub fn exec_command(command: &Vec<&str>) -> Result { if command.is_empty() { @@ -25,48 +27,10 @@ pub fn exec_command(command: &Vec<&str>) -> Result { Ok(output_str) } -struct CountingReader { - inner: R, - sender: Sender, - bytes: u64, - last_send: Instant, - total: Option, -} - -impl Read for CountingReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let n = self.inner.read(buf)?; - self.bytes += n as u64; - if self.last_send.elapsed().as_millis() >= 100 { - self.sender - .send(BackupEvent::BytesTransferred { - bytes: self.bytes, - estimated_total: self.total, - }) - .ok(); - self.last_send = Instant::now(); - } - Ok(n) - } -} - -impl CountingReader { - fn new(inner: R, sender: Sender, total: Option) -> Self { - Self { - inner, - sender, - total, - bytes: 0, - last_send: Instant::now() - Duration::from_secs(1), - } - } -} - pub fn exec_piped_commands( source: &Vec<&str>, dest: &Vec<&str>, - sender: Option>, - total: Option, + filters: Vec>, ) -> Result<(), String> { if source.is_empty() || dest.is_empty() { return Err("Source or destination command is empty".to_string()); @@ -93,10 +57,11 @@ pub fn exec_piped_commands( let send_stdout = send_process.stdout.take().unwrap(); let mut receive_stdin = receive_process.stdin.take().unwrap(); - let mut reader: Box = match sender { - Some(s) => Box::new(CountingReader::new(send_stdout, s, total)), - None => Box::new(send_stdout), - }; + let mut reader: Box = Box::new(send_stdout); + for filter in filters { + reader = filter.filter(reader); + } + std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; let receive_status = receive_process.wait().map_err(|e| e.to_string())?; diff --git a/src/job.rs b/src/job.rs index dcb4475..dab911c 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,5 +1,6 @@ use crate::command; use crate::progress::BackupEvent; +use crate::progress::filter; use chrono::{Local, NaiveDateTime}; use std::collections::HashSet; use std::sync::mpsc::Sender; @@ -192,7 +193,11 @@ impl Job { let mut receive_cmd = self.get_side_command(JobSide::Destination); receive_cmd.extend(["receive", "-F", dest]); - command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone(), total)?; + let filters = match self.sender.as_ref() { + Some(sender) => vec![filter::CountingReaderBuilder::build(sender.clone(), total)], + None => vec![], + }; + command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?; self.send_event(BackupEvent::DatasetCompleted(source.to_string())); Ok(()) } diff --git a/src/main.rs b/src/main.rs index ec2462f..0570ca8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ use clap::Parser; use std::error::Error; +use std::io::IsTerminal; use std::sync::mpsc::channel; use std::thread; use zfsbackup::job::JobBuilder; -use zfsbackup::progress::Progressor; -use zfsbackup::progress::terminal; +use zfsbackup::progress::{Progressor, log, terminal}; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -51,7 +51,11 @@ fn main() -> Result<(), Box> { } let (tx, rx) = channel(); - let mut pr: Box = Box::new(terminal::Progressor::new(rx)); + let mut pr: Box = if std::io::stdout().is_terminal() { + Box::new(terminal::Progressor::new(rx)) + } else { + Box::new(log::Progressor::new(rx)) + }; thread::spawn(move || pr.run()); builder = builder.sender(tx); diff --git a/src/progress/filter.rs b/src/progress/filter.rs new file mode 100644 index 0000000..51260b3 --- /dev/null +++ b/src/progress/filter.rs @@ -0,0 +1,59 @@ +use super::BackupEvent; +use crate::command::Filter; +use std::io::{self, Read}; +use std::sync::mpsc::Sender; +use std::time::{Duration, Instant}; + +struct CountingReader { + inner: R, + sender: Sender, + bytes: u64, + last_send: Instant, + total: Option, +} + +impl Read for CountingReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = self.inner.read(buf)?; + self.bytes += n as u64; + if self.last_send.elapsed().as_millis() >= 100 { + self.sender + .send(BackupEvent::BytesTransferred { + bytes: self.bytes, + estimated_total: self.total, + }) + .ok(); + self.last_send = Instant::now(); + } + Ok(n) + } +} + +impl CountingReader { + fn new(inner: R, sender: Sender, total: Option) -> Self { + Self { + inner, + sender, + total, + bytes: 0, + last_send: Instant::now() - Duration::from_secs(1), + } + } +} + +pub struct CountingReaderBuilder { + sender: Sender, + total: Option, +} + +impl Filter for CountingReaderBuilder { + fn filter(&self, reader: Box) -> Box { + Box::new(CountingReader::new(reader, self.sender.clone(), self.total)) + } +} + +impl CountingReaderBuilder { + pub fn build(sender: Sender, total: Option) -> Box { + Box::new(Self { sender, total }) + } +} diff --git a/src/progress/mod.rs b/src/progress/mod.rs index dd4aef1..360ea42 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -1,3 +1,4 @@ +pub mod filter; pub mod log; pub mod terminal;