Rearrange pipeline injector to a vec of filters
This commit is contained in:
@@ -1,8 +1,10 @@
|
|||||||
use crate::progress::BackupEvent;
|
|
||||||
use std::io;
|
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
use std::time::{Duration, Instant};
|
/// A trait for filters that can be applied to the output of a command before
|
||||||
|
/// it is piped to another command.
|
||||||
|
pub trait Filter {
|
||||||
|
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
||||||
|
}
|
||||||
|
|
||||||
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||||
if command.is_empty() {
|
if command.is_empty() {
|
||||||
@@ -25,48 +27,10 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
|||||||
Ok(output_str)
|
Ok(output_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CountingReader<R: Read> {
|
|
||||||
inner: R,
|
|
||||||
sender: Sender<BackupEvent>,
|
|
||||||
bytes: u64,
|
|
||||||
last_send: Instant,
|
|
||||||
total: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: Read> Read for CountingReader<R> {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
let n = self.inner.read(buf)?;
|
|
||||||
self.bytes += n as u64;
|
|
||||||
if self.last_send.elapsed().as_millis() >= 100 {
|
|
||||||
self.sender
|
|
||||||
.send(BackupEvent::BytesTransferred {
|
|
||||||
bytes: self.bytes,
|
|
||||||
estimated_total: self.total,
|
|
||||||
})
|
|
||||||
.ok();
|
|
||||||
self.last_send = Instant::now();
|
|
||||||
}
|
|
||||||
Ok(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: Read> CountingReader<R> {
|
|
||||||
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
sender,
|
|
||||||
total,
|
|
||||||
bytes: 0,
|
|
||||||
last_send: Instant::now() - Duration::from_secs(1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn exec_piped_commands(
|
pub fn exec_piped_commands(
|
||||||
source: &Vec<&str>,
|
source: &Vec<&str>,
|
||||||
dest: &Vec<&str>,
|
dest: &Vec<&str>,
|
||||||
sender: Option<Sender<BackupEvent>>,
|
filters: Vec<Box<dyn Filter>>,
|
||||||
total: Option<u64>,
|
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
if source.is_empty() || dest.is_empty() {
|
if source.is_empty() || dest.is_empty() {
|
||||||
return Err("Source or destination command is empty".to_string());
|
return Err("Source or destination command is empty".to_string());
|
||||||
@@ -93,10 +57,11 @@ pub fn exec_piped_commands(
|
|||||||
let send_stdout = send_process.stdout.take().unwrap();
|
let send_stdout = send_process.stdout.take().unwrap();
|
||||||
let mut receive_stdin = receive_process.stdin.take().unwrap();
|
let mut receive_stdin = receive_process.stdin.take().unwrap();
|
||||||
|
|
||||||
let mut reader: Box<dyn Read> = match sender {
|
let mut reader: Box<dyn Read> = Box::new(send_stdout);
|
||||||
Some(s) => Box::new(CountingReader::new(send_stdout, s, total)),
|
for filter in filters {
|
||||||
None => Box::new(send_stdout),
|
reader = filter.filter(reader);
|
||||||
};
|
}
|
||||||
|
|
||||||
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
|
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
|
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use crate::command;
|
use crate::command;
|
||||||
use crate::progress::BackupEvent;
|
use crate::progress::BackupEvent;
|
||||||
|
use crate::progress::filter;
|
||||||
use chrono::{Local, NaiveDateTime};
|
use chrono::{Local, NaiveDateTime};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::mpsc::Sender;
|
use std::sync::mpsc::Sender;
|
||||||
@@ -192,7 +193,11 @@ impl Job {
|
|||||||
|
|
||||||
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
||||||
receive_cmd.extend(["receive", "-F", dest]);
|
receive_cmd.extend(["receive", "-F", dest]);
|
||||||
command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone(), total)?;
|
let filters = match self.sender.as_ref() {
|
||||||
|
Some(sender) => vec![filter::CountingReaderBuilder::build(sender.clone(), total)],
|
||||||
|
None => vec![],
|
||||||
|
};
|
||||||
|
command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?;
|
||||||
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
10
src/main.rs
10
src/main.rs
@@ -1,11 +1,11 @@
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use std::io::IsTerminal;
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use zfsbackup::job::JobBuilder;
|
use zfsbackup::job::JobBuilder;
|
||||||
use zfsbackup::progress::Progressor;
|
use zfsbackup::progress::{Progressor, log, terminal};
|
||||||
use zfsbackup::progress::terminal;
|
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(version, about, long_about = None)]
|
#[command(version, about, long_about = None)]
|
||||||
@@ -51,7 +51,11 @@ fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let mut pr: Box<dyn Progressor> = Box::new(terminal::Progressor::new(rx));
|
let mut pr: Box<dyn Progressor> = if std::io::stdout().is_terminal() {
|
||||||
|
Box::new(terminal::Progressor::new(rx))
|
||||||
|
} else {
|
||||||
|
Box::new(log::Progressor::new(rx))
|
||||||
|
};
|
||||||
thread::spawn(move || pr.run());
|
thread::spawn(move || pr.run());
|
||||||
|
|
||||||
builder = builder.sender(tx);
|
builder = builder.sender(tx);
|
||||||
|
|||||||
59
src/progress/filter.rs
Normal file
59
src/progress/filter.rs
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
use super::BackupEvent;
|
||||||
|
use crate::command::Filter;
|
||||||
|
use std::io::{self, Read};
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
struct CountingReader<R: Read> {
|
||||||
|
inner: R,
|
||||||
|
sender: Sender<BackupEvent>,
|
||||||
|
bytes: u64,
|
||||||
|
last_send: Instant,
|
||||||
|
total: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> Read for CountingReader<R> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
let n = self.inner.read(buf)?;
|
||||||
|
self.bytes += n as u64;
|
||||||
|
if self.last_send.elapsed().as_millis() >= 100 {
|
||||||
|
self.sender
|
||||||
|
.send(BackupEvent::BytesTransferred {
|
||||||
|
bytes: self.bytes,
|
||||||
|
estimated_total: self.total,
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
|
self.last_send = Instant::now();
|
||||||
|
}
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Read> CountingReader<R> {
|
||||||
|
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
sender,
|
||||||
|
total,
|
||||||
|
bytes: 0,
|
||||||
|
last_send: Instant::now() - Duration::from_secs(1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CountingReaderBuilder {
|
||||||
|
sender: Sender<BackupEvent>,
|
||||||
|
total: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Filter for CountingReaderBuilder {
|
||||||
|
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read> {
|
||||||
|
Box::new(CountingReader::new(reader, self.sender.clone(), self.total))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CountingReaderBuilder {
|
||||||
|
pub fn build(sender: Sender<BackupEvent>, total: Option<u64>) -> Box<dyn Filter> {
|
||||||
|
Box::new(Self { sender, total })
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod filter;
|
||||||
pub mod log;
|
pub mod log;
|
||||||
pub mod terminal;
|
pub mod terminal;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user