diff --git a/src/command.rs b/src/command.rs index 3d11f5a..9e3c827 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,3 +1,9 @@ +use crate::progress::BackupEvent; +use std::io; +use std::io::Read; +use std::sync::mpsc::Sender; +use std::time::{Duration, Instant}; + pub fn exec_command(command: &Vec<&str>) -> Result { if command.is_empty() { return Err("Command is empty".to_string()); @@ -19,7 +25,46 @@ pub fn exec_command(command: &Vec<&str>) -> Result { Ok(output_str) } -pub fn exec_piped_commands(source: &Vec<&str>, dest: &Vec<&str>) -> Result<(), String> { +struct CountingReader { + inner: R, + sender: Sender, + bytes_read: u64, + last_send: Instant, +} + +impl Read for CountingReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = self.inner.read(buf)?; + self.bytes_read += n as u64; + if self.last_send.elapsed().as_millis() >= 100 { + self.sender + .send(BackupEvent::BytesTransferred { + dataset: String::from(""), + bytes: self.bytes_read, + }) + .ok(); + self.last_send = Instant::now(); + } + Ok(n) + } +} + +impl CountingReader { + fn new(inner: R, sender: Sender) -> Self { + Self { + inner, + sender, + bytes_read: 0, + last_send: Instant::now() - Duration::from_secs(1), + } + } +} + +pub fn exec_piped_commands( + source: &Vec<&str>, + dest: &Vec<&str>, + sender: Option>, +) -> Result<(), String> { if source.is_empty() || dest.is_empty() { return Err("Source or destination command is empty".to_string()); } @@ -38,10 +83,19 @@ pub fn exec_piped_commands(source: &Vec<&str>, dest: &Vec<&str>) -> Result<(), S .map_err(|e| e.to_string())?; let mut receive_process = receive_cmd - .stdin(send_process.stdout.take().unwrap()) + .stdin(std::process::Stdio::piped()) .spawn() .map_err(|e| e.to_string())?; + let send_stdout = send_process.stdout.take().unwrap(); + let mut receive_stdin = receive_process.stdin.take().unwrap(); + + let mut reader: Box = match sender { + Some(s) => Box::new(CountingReader::new(send_stdout, s)), + None => Box::new(send_stdout), + }; + std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?; + let receive_status = receive_process.wait().map_err(|e| e.to_string())?; let send_status = send_process.wait().map_err(|e| e.to_string())?; if !receive_status.success() { diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..56b3e95 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,338 @@ +use crate::command; +use crate::progress::BackupEvent; +use chrono::{Local, NaiveDateTime}; +use std::collections::HashSet; +use std::sync::mpsc::Sender; + +pub struct JobBuilder { + sources: Vec, + target: String, + source_zfs_command: Vec, + target_zfs_command: Vec, + dryrun: bool, + retain: usize, + sender: Option>, +} + +fn parse_command(commandstr: &str) -> Vec { + commandstr + .split_whitespace() + .map(|s| s.to_string()) + .collect() +} + +impl JobBuilder { + pub fn new(sources: Vec, target: String) -> Self { + JobBuilder { + sources, + target, + source_zfs_command: vec!["zfs".to_string()], + target_zfs_command: vec!["zfs".to_string()], + dryrun: false, + retain: 2, + sender: None, + } + } + + pub fn source_zfs_command(mut self, commandstr: &str) -> Self { + let command = parse_command(commandstr); + self.source_zfs_command = command; + self + } + + pub fn target_zfs_command(mut self, commandstr: &str) -> Self { + let command = parse_command(commandstr); + self.target_zfs_command = command; + self + } + + pub fn zfs_command(mut self, commandstr: &str) -> Self { + let command = parse_command(commandstr); + self.source_zfs_command = command.clone(); + self.target_zfs_command = command; + self + } + + pub fn dryrun(mut self) -> Self { + self.dryrun = true; + self + } + + pub fn retain(mut self, retain: usize) -> Self { + self.retain = retain; + self + } + + pub fn sender(mut self, sender: Sender) -> Self { + self.sender = Some(sender); + self + } + + pub fn build(self) -> Result { + let mut job = Job { + datasets: vec![], + target: self.target, + source_zfs_command: self.source_zfs_command, + target_zfs_command: self.target_zfs_command, + dryrun: self.dryrun, + retain: self.retain, + sender: self.sender, + }; + let mut datasets: Vec = vec![]; + for source in &self.sources { + let recurse = source.ends_with("/..."); + let source = source.trim_end_matches("/..."); + + let mut args = vec!["list", "-H", "-o", "name"]; + if recurse { + args.push("-r"); + } + args.push(source); + + let mut cmd = job.get_side_command(JobSide::Source); + cmd.extend(args); + let output = command::exec_command(&cmd)?; + datasets.extend(output.lines().map(str::to_string)); + } + if datasets.is_empty() { + return Err(String::from("no matching source datasets found")); + } + job.datasets = datasets; + Ok(job) + } +} + +pub struct Job { + datasets: Vec, + target: String, + source_zfs_command: Vec, + target_zfs_command: Vec, + dryrun: bool, + retain: usize, + sender: Option>, +} + +#[derive(Debug)] +enum JobSide { + Source, + Destination, +} + +#[derive(Debug)] +struct Snapshot { + snapshot: String, + side: JobSide, +} + +impl Job { + pub fn dump(&self) { + println!("Datasets: {:?}", self.datasets); + println!("Target: {}", self.target); + println!("Source ZFS Command: {:?}", self.source_zfs_command); + println!("Target ZFS Command: {:?}", self.target_zfs_command); + println!("Dryrun: {}", self.dryrun); + } + + fn send_event(&self, event: BackupEvent) { + if let Some(sender) = &self.sender { + sender.send(event).ok(); + } + } + + fn get_side_command(&self, side: JobSide) -> Vec<&str> { + match side { + JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(), + JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(), + } + } + + fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result { + let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S")); + let mut cmd = self.get_side_command(side); + cmd.extend(["snapshot", &snapshot]); + command::exec_command(&cmd)?; + self.send_event(BackupEvent::SnapshotCreated(snapshot.clone())); + Ok(snapshot) + } + + fn estimate(&self, source: &str, inc_snapshot: Option<&str>) -> Result { + let mut cmd = self.get_side_command(JobSide::Source); + cmd.push("send"); + if let Some(inc) = inc_snapshot { + cmd.extend(["-i", inc]); + } + cmd.extend(["-n", "-P"]); + cmd.push(source); + let output = command::exec_command(&cmd)?; + let size = output + .lines() + .last() + .ok_or("estimate parse error")? + .split_whitespace() + .last() + .ok_or("estimate parse error")? + .parse::() + .map_err(|e| format!("estimate parse error: {}", e))?; + self.send_event(BackupEvent::Estimate(size)); + Ok(size) + } + fn send_receive( + &self, + source: &str, + dest: &str, + inc_snapshot: Option<&str>, + ) -> Result<(), String> { + let mut send_cmd = self.get_side_command(JobSide::Source); + send_cmd.push("send"); + if let Some(inc) = inc_snapshot { + send_cmd.extend(["-i", inc]); + } + send_cmd.push(source); + + let mut receive_cmd = self.get_side_command(JobSide::Destination); + receive_cmd.extend(["receive", "-F", dest]); + command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone())?; + self.send_event(BackupEvent::DatasetCompleted(source.to_string())); + Ok(()) + } + + fn list_snapshots(&self, source: &str, side: JobSide) -> Result, String> { + let mut cmd = self.get_side_command(side); + cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]); + let output = command::exec_command(&cmd)?; + let snapshots: Vec<&str> = output + .split_whitespace() + .map(|s| { + let parts: Vec<&str> = s.split("@").collect(); + if parts.len() == 2 { + Ok(parts[1]) + } else { + Err(format!("invalid snapshot name: {}", s)) + } + }) + .collect::, _>>()?; + let result: Vec = snapshots + .iter() + .filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok()) + .map(|s| s.to_string()) + .collect(); + Ok(result) + } + + fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result { + self.find_matching_snapshots(source, dest)? + .into_iter() + .next() + .ok_or(String::from("no matching snapshots")) + } + + fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result, String> { + let source_snapshots = self.list_snapshots(source, JobSide::Source)?; + let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; + dest_snapshots.sort_by(|a, b| b.cmp(a)); + + let source_set: HashSet = source_snapshots.into_iter().collect(); + + let matching_snapshots: Vec = dest_snapshots + .into_iter() + .filter(|s| source_set.contains(s)) + .collect(); + + if matching_snapshots.is_empty() { + Err(String::from("no matching snapshots")) + } else { + Ok(matching_snapshots) + } + } + + fn find_old_snapshots(&self, source: &str, dest: &str) -> Result, String> { + let source_snapshots = self.list_snapshots(source, JobSide::Source)?; + let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; + let matching_snapshots = self.find_matching_snapshots(source, dest)?; + if matching_snapshots.is_empty() { + return Err(String::from("no matching snapshots found")); + } + let retain: HashSet = matching_snapshots.into_iter().take(self.retain).collect(); + let result = source_snapshots + .into_iter() + .filter(|s| !retain.contains(s)) + .map(|s| Snapshot { + snapshot: format!("{}@{}", source, s), + side: JobSide::Source, + }) + .chain( + dest_snapshots + .into_iter() + .filter(|s| !retain.contains(s)) + .map(|s| Snapshot { + snapshot: format!("{}@{}", dest, s), + side: JobSide::Destination, + }), + ) + .collect(); + Ok(result) + } + + fn delete_snapshot(&self, snapshot: Snapshot) -> Result { + let mut cmd = self.get_side_command(snapshot.side); + cmd.extend(["destroy", &snapshot.snapshot]); + let res = command::exec_command(&cmd); + if res.is_ok() { + self.send_event(BackupEvent::SnapshotDeleted(snapshot.snapshot.clone())); + } + res + } + + pub fn run(&self) -> Result<(), String> { + for (index, source) in self.datasets.iter().enumerate() { + // Check the source exists + let mut cmd = self.get_side_command(JobSide::Source); + cmd.extend(["list", "-H", "-o", "name", source]); + let _ = command::exec_command(&cmd)?; + + // Check whether the destination exists + // TODO: This will assume the destination doesn't exist if the + // command fails for any reason. + let dest = format!("{}/{}", self.target, source); + cmd = self.get_side_command(JobSide::Destination); + cmd.extend(["list", "-H", "-o", "name", &dest]); + let dest_exists = command::exec_command(&cmd).is_ok(); + + // Run backup + if dest_exists { + self.send_event(BackupEvent::StartingIncrementalBackup { + source: source.clone(), + dest: dest.clone(), + index: index + 1, + total: self.datasets.len(), + }); + let inc_snapshot = format!( + "{}@{}", + source, + self.find_latest_matching_snapshot(source, &dest)? + ); + + let snapshot = self.create_snapshot(source, JobSide::Source)?; + self.estimate(&snapshot, Some(&inc_snapshot)).ok(); + self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?; + } else { + self.send_event(BackupEvent::StartingFullBackup { + source: source.clone(), + dest: dest.clone(), + index: index + 1, + total: self.datasets.len(), + }); + let snapshot = self.create_snapshot(source, JobSide::Source)?; + self.estimate(&snapshot, None).ok(); + self.send_receive(&snapshot, &dest, None)?; + } + + // Clean up snapshots + self.find_old_snapshots(source, &dest)? + .into_iter() + .map(|s| self.delete_snapshot(s)) + .collect::, _>>()?; + } + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 033da8c..807b4bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,282 +1,3 @@ mod command; -use chrono::{Local, NaiveDateTime}; -use std::collections::HashSet; - -pub struct JobBuilder { - sources: Vec, - target: String, - source_zfs_command: Vec, - target_zfs_command: Vec, - dryrun: bool, - retain: usize, -} - -fn parse_command(commandstr: &str) -> Vec { - commandstr - .split_whitespace() - .map(|s| s.to_string()) - .collect() -} - -impl JobBuilder { - pub fn new(sources: Vec, target: String) -> Self { - JobBuilder { - sources, - target, - source_zfs_command: vec!["zfs".to_string()], - target_zfs_command: vec!["zfs".to_string()], - dryrun: false, - retain: 2, - } - } - - pub fn source_zfs_command(mut self, commandstr: &str) -> Self { - let command = parse_command(commandstr); - self.source_zfs_command = command; - self - } - - pub fn target_zfs_command(mut self, commandstr: &str) -> Self { - let command = parse_command(commandstr); - self.target_zfs_command = command; - self - } - - pub fn zfs_command(mut self, commandstr: &str) -> Self { - let command = parse_command(commandstr); - self.source_zfs_command = command.clone(); - self.target_zfs_command = command; - self - } - - pub fn dryrun(mut self) -> Self { - self.dryrun = true; - self - } - - pub fn retain(mut self, retain: usize) -> Self { - self.retain = retain; - self - } - - pub fn build(self) -> Result { - let mut job = Job { - datasets: vec![], - target: self.target, - source_zfs_command: self.source_zfs_command, - target_zfs_command: self.target_zfs_command, - dryrun: self.dryrun, - retain: self.retain, - }; - let mut datasets: Vec = vec![]; - for source in &self.sources { - let recurse = source.ends_with("/..."); - let source = source.trim_end_matches("/..."); - - let mut args = vec!["list", "-H", "-o", "name"]; - if recurse { - args.push("-r"); - } - args.push(source); - - let mut cmd = job.get_side_command(JobSide::Source); - cmd.extend(args); - let output = command::exec_command(&cmd)?; - datasets.extend(output.lines().map(str::to_string)); - } - job.datasets = datasets; - Ok(job) - } -} - -pub struct Job { - datasets: Vec, - target: String, - source_zfs_command: Vec, - target_zfs_command: Vec, - dryrun: bool, - retain: usize, -} - -#[derive(Debug)] -enum JobSide { - Source, - Destination, -} - -#[derive(Debug)] -struct Snapshot { - snapshot: String, - side: JobSide, -} - -impl Job { - pub fn dump(&self) { - println!("Datasets: {:?}", self.datasets); - println!("Target: {}", self.target); - println!("Source ZFS Command: {:?}", self.source_zfs_command); - println!("Target ZFS Command: {:?}", self.target_zfs_command); - println!("Dryrun: {}", self.dryrun); - } - - fn get_side_command(&self, side: JobSide) -> Vec<&str> { - match side { - JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(), - JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(), - } - } - - fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result { - let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S")); - let mut cmd = self.get_side_command(side); - cmd.extend(["snapshot", &snapshot]); - command::exec_command(&cmd)?; - Ok(snapshot) - } - - fn send_receive( - &self, - source: &str, - dest: &str, - inc_snapshot: Option<&str>, - ) -> Result<(), String> { - let mut send_cmd = self.get_side_command(JobSide::Source); - send_cmd.push("send"); - if let Some(inc) = inc_snapshot { - send_cmd.extend(["-i", inc]); - } - send_cmd.push(source); - - let mut receive_cmd = self.get_side_command(JobSide::Destination); - receive_cmd.extend(["receive", "-F", dest]); - command::exec_piped_commands(&send_cmd, &receive_cmd)?; - Ok(()) - } - - fn list_snapshots(&self, source: &str, side: JobSide) -> Result, String> { - let mut cmd = self.get_side_command(side); - cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]); - let output = command::exec_command(&cmd)?; - let snapshots: Vec<&str> = output - .split_whitespace() - .map(|s| { - let parts: Vec<&str> = s.split("@").collect(); - if parts.len() == 2 { - Ok(parts[1]) - } else { - Err(format!("invalid snapshot name: {}", s)) - } - }) - .collect::, _>>()?; - let result: Vec = snapshots - .iter() - .filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok()) - .map(|s| s.to_string()) - .collect(); - Ok(result) - } - - fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result { - self.find_matching_snapshots(source, dest)? - .into_iter() - .next() - .ok_or(String::from("no matching snapshots")) - } - - fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result, String> { - let source_snapshots = self.list_snapshots(source, JobSide::Source)?; - let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; - dest_snapshots.sort_by(|a, b| b.cmp(a)); - - let source_set: HashSet = source_snapshots.into_iter().collect(); - - let matching_snapshots: Vec = dest_snapshots - .into_iter() - .filter(|s| source_set.contains(s)) - .collect(); - - if matching_snapshots.is_empty() { - Err(String::from("no matching snapshots")) - } else { - Ok(matching_snapshots) - } - } - - fn find_old_snapshots(&self, source: &str, dest: &str) -> Result, String> { - let source_snapshots = self.list_snapshots(source, JobSide::Source)?; - let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; - let matching_snapshots = self.find_matching_snapshots(source, dest)?; - if matching_snapshots.is_empty() { - return Err(String::from("no matching snapshots found")); - } - let retain: HashSet = matching_snapshots.into_iter().take(self.retain).collect(); - let result = source_snapshots - .into_iter() - .filter(|s| !retain.contains(s)) - .map(|s| Snapshot { - snapshot: format!("{}@{}", source, s), - side: JobSide::Source, - }) - .chain( - dest_snapshots - .into_iter() - .filter(|s| !retain.contains(s)) - .map(|s| Snapshot { - snapshot: format!("{}@{}", dest, s), - side: JobSide::Destination, - }), - ) - .collect(); - Ok(result) - } - - fn delete_snapshot(&self, snapshot: Snapshot) -> Result { - let mut cmd = self.get_side_command(snapshot.side); - cmd.extend(["destroy", &snapshot.snapshot]); - command::exec_command(&cmd) - } - - pub fn run(&self) -> Result<(), String> { - for source in &self.datasets { - // Check the source exists - let mut cmd = self.get_side_command(JobSide::Source); - cmd.extend(["list", "-H", "-o", "name", source]); - let _ = command::exec_command(&cmd)?; - - // Check whether the destination exists - // TODO: This will assume the destination doesn't exist if the - // command fails for any reason. - let dest = format!("{}/{}", self.target, source); - cmd = self.get_side_command(JobSide::Destination); - cmd.extend(["list", "-H", "-o", "name", &dest]); - let dest_exists = command::exec_command(&cmd).is_ok(); - - // Run backup - if dest_exists { - println!("Incremental backup {} -> {}", source, dest); - let inc_snapshot = format!( - "{}@{}", - source, - self.find_latest_matching_snapshot(source, &dest)? - ); - println!("Found matching snapshot: {}", inc_snapshot); - - let snapshot = self.create_snapshot(source, JobSide::Source)?; - self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?; - } else { - println!("Full backup {} -> {}", source, dest); - let snapshot = self.create_snapshot(source, JobSide::Source)?; - self.send_receive(&snapshot, &dest, None)?; - } - - // Clean up snapshots - let old_snapshots = self.find_old_snapshots(source, &dest)?; - println!("Old snapshots: {:?}", old_snapshots); - old_snapshots - .into_iter() - .map(|s| self.delete_snapshot(s)) - .collect::, _>>()?; - } - Ok(()) - } -} +pub mod job; +pub mod progress; diff --git a/src/main.rs b/src/main.rs index d3f2921..fe6b08f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,10 @@ use clap::Parser; -use std::env::args; -use zfsbackup::JobBuilder; +use std::error::Error; +use std::sync::mpsc::channel; +use std::thread; +use zfsbackup::job::JobBuilder; +use zfsbackup::progress::ProgressReporter; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -24,7 +27,7 @@ struct Args { datasets: Vec, } -fn main() { +fn main() -> Result<(), Box> { let args = Args::parse(); let mut builder = JobBuilder::new(args.datasets, args.target); if args.dry_run { @@ -39,6 +42,14 @@ fn main() { if let Some(cmd) = args.target_zfs_command { builder = builder.target_zfs_command(&cmd); } - let job = builder.build().expect("asplode"); - job.run().expect("boom"); + + let (tx, rx) = channel(); + let mut pr = ProgressReporter::new(rx); + thread::spawn(move || pr.run()); + + builder = builder.sender(tx); + + let job = builder.build()?; + job.run()?; + Ok(()) } diff --git a/src/progress.rs b/src/progress.rs new file mode 100644 index 0000000..656ecb5 --- /dev/null +++ b/src/progress.rs @@ -0,0 +1,95 @@ +use std::{io::Write, sync::mpsc::Receiver}; + +pub enum BackupEvent { + Estimate(u64), + StartingFullBackup { + source: String, + dest: String, + index: usize, + total: usize, + }, + StartingIncrementalBackup { + source: String, + dest: String, + index: usize, + total: usize, + }, + SnapshotCreated(String), + SnapshotDeleted(String), + BytesTransferred { + dataset: String, + bytes: u64, + }, + DatasetCompleted(String), +} + +pub struct ProgressReporter { + receiver: Receiver, + estimated_size: u64, +} + +impl ProgressReporter { + pub fn new(receiver: Receiver) -> Self { + Self { + receiver, + estimated_size: 0, + } + } + + pub fn run(&mut self) { + while let Ok(event) = self.receiver.recv() { + match event { + BackupEvent::Estimate(size) => { + println!("Estimated total backup size: {} bytes", size); + self.estimated_size = size; + } + BackupEvent::StartingFullBackup { + source, + dest, + index, + total, + } => { + println!( + "Starting full backup of {} to {} ({} of {})", + source, dest, index, total + ); + } + BackupEvent::StartingIncrementalBackup { + source, + dest, + index, + total, + } => { + println!( + "Starting incremental backup of {} to {} ({} of {})", + source, dest, index, total + ); + } + BackupEvent::SnapshotCreated(name) => { + println!("Created snapshot: {}", name); + } + BackupEvent::SnapshotDeleted(name) => { + println!("Deleted snapshot: {}", name); + } + BackupEvent::BytesTransferred { dataset, bytes } => { + let percent: f64 = if self.estimated_size > 0 { + bytes as f64 / self.estimated_size as f64 + } else { + 0.0 + }; + print!( + "{:>3.0}% {}/{} bytes transferred\r", + percent * 100.0, + bytes, + self.estimated_size + ); + std::io::stdout().flush().ok(); + } + BackupEvent::DatasetCompleted(name) => { + println!("Completed backup of dataset: {}", name); + self.estimated_size = 0; + } + } + } + } +}