diff --git a/src/job.rs b/src/job.rs index 0d3b8fb..e1ce9ee 100644 --- a/src/job.rs +++ b/src/job.rs @@ -3,6 +3,7 @@ use crate::progress::BackupEvent; use crate::progress::filter; use chrono::{Local, NaiveDateTime}; use std::collections::HashSet; +use std::fmt; use std::sync::mpsc::Sender; pub struct Job { @@ -14,16 +15,24 @@ pub struct Job { sender: Option>, } +#[derive(Copy, Clone)] enum JobSide { Source, - Destination, + Target, } struct Snapshot { - snapshot: String, + dataset: String, + snapshot_id: String, side: JobSide, } +impl fmt::Display for Snapshot { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}@{}", self.dataset, self.snapshot_id) + } +} + impl Job { fn send_event(&self, event: BackupEvent) { if let Some(sender) = &self.sender { @@ -34,16 +43,21 @@ impl Job { fn get_side_command(&self, side: JobSide) -> Vec<&str> { match side { JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(), - JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(), + JobSide::Target => self.target_zfs_command.iter().map(|s| s.as_str()).collect(), } } - fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result { - let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S")); + fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result { + let snapshot = Snapshot { + dataset: dataset.to_string(), + snapshot_id: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(), + side, + }; + let name = snapshot.to_string(); let mut cmd = self.get_side_command(side); - cmd.extend(["snapshot", &snapshot]); + cmd.extend(["snapshot", &name]); command::exec_command(&cmd)?; - self.send_event(BackupEvent::SnapshotCreated(snapshot.clone())); + self.send_event(BackupEvent::SnapshotCreated(name.clone())); Ok(snapshot) } @@ -68,6 +82,7 @@ impl Job { self.send_event(BackupEvent::Estimate(size)); Ok(size) } + fn send_receive( &self, source: &str, @@ -82,10 +97,13 @@ impl Job { } send_cmd.push(source); - let mut receive_cmd = self.get_side_command(JobSide::Destination); + let mut receive_cmd = self.get_side_command(JobSide::Target); receive_cmd.extend(["receive", "-F", dest]); - let filters = match self.sender.as_ref() { - Some(sender) => vec![filter::CountingReaderBuilder::build(sender.clone(), total)], + let filters: Vec> = match self.sender.as_ref() { + Some(sender) => vec![Box::new(filter::ByteCountFilter::new( + sender.clone(), + total, + ))], None => vec![], }; command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?; @@ -93,7 +111,7 @@ impl Job { Ok(()) } - fn list_snapshots(&self, source: &str, side: JobSide) -> Result, String> { + fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result, String> { let mut cmd = self.get_side_command(side); cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]); let output = command::exec_command(&cmd)?; @@ -116,16 +134,16 @@ impl Job { Ok(result) } - fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result { - self.find_matching_snapshots(source, dest)? + fn find_latest_matching_snapshot_id(&self, source: &str, dest: &str) -> Result { + self.find_matching_snapshot_ids(source, dest)? .into_iter() .next() .ok_or(String::from("no matching snapshots")) } - fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result, String> { - let source_snapshots = self.list_snapshots(source, JobSide::Source)?; - let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; + fn find_matching_snapshot_ids(&self, source: &str, dest: &str) -> Result, String> { + let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?; + let mut dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?; dest_snapshots.sort_by(|a, b| b.cmp(a)); let source_set: HashSet = source_snapshots.into_iter().collect(); @@ -143,9 +161,9 @@ impl Job { } fn find_old_snapshots(&self, source: &str, dest: &str) -> Result, String> { - let source_snapshots = self.list_snapshots(source, JobSide::Source)?; - let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?; - let matching_snapshots = self.find_matching_snapshots(source, dest)?; + let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?; + let dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?; + let matching_snapshots = self.find_matching_snapshot_ids(source, dest)?; if matching_snapshots.is_empty() { return Err(String::from("no matching snapshots found")); } @@ -154,7 +172,8 @@ impl Job { .into_iter() .filter(|s| !retain.contains(s)) .map(|s| Snapshot { - snapshot: format!("{}@{}", source, s), + dataset: source.to_string(), + snapshot_id: s, side: JobSide::Source, }) .chain( @@ -162,22 +181,22 @@ impl Job { .into_iter() .filter(|s| !retain.contains(s)) .map(|s| Snapshot { - snapshot: format!("{}@{}", dest, s), - side: JobSide::Destination, + dataset: dest.to_string(), + snapshot_id: s, + side: JobSide::Target, }), ) .collect(); Ok(result) } - fn delete_snapshot(&self, snapshot: Snapshot) -> Result { + fn delete_snapshot(&self, snapshot: Snapshot) -> Result<(), String> { let mut cmd = self.get_side_command(snapshot.side); - cmd.extend(["destroy", &snapshot.snapshot]); - let res = command::exec_command(&cmd); - if res.is_ok() { - self.send_event(BackupEvent::SnapshotDeleted(snapshot.snapshot.clone())); - } - res + let name = snapshot.to_string(); + cmd.extend(["destroy", &name]); + command::exec_command(&cmd)?; + self.send_event(BackupEvent::SnapshotDeleted(name.clone())); + Ok(()) } pub fn run(&self, execute: bool) -> Result<(), String> { @@ -191,7 +210,7 @@ impl Job { // TODO: This will assume the destination doesn't exist if the // command fails for any reason. let dest = format!("{}/{}", self.target, source); - cmd = self.get_side_command(JobSide::Destination); + cmd = self.get_side_command(JobSide::Target); cmd.extend(["list", "-H", "-o", "name", &dest]); let dest_exists = command::exec_command(&cmd).is_ok(); @@ -206,20 +225,19 @@ impl Job { let inc_snapshot = format!( "{}@{}", source, - self.find_latest_matching_snapshot(source, &dest)? + self.find_latest_matching_snapshot_id(source, &dest)? ); let snapshot = self.create_snapshot(source, JobSide::Source)?; - let total = self.estimate(&snapshot, Some(&inc_snapshot)).ok(); + let total = self + .estimate(&snapshot.to_string(), Some(&inc_snapshot)) + .ok(); if !execute { self.send_event(BackupEvent::DryrunCompleted(source.clone())); - self.delete_snapshot(Snapshot { - snapshot: snapshot.clone(), - side: JobSide::Source, - })?; - return Ok(()); + self.delete_snapshot(snapshot)?; + continue; } - self.send_receive(&snapshot, &dest, Some(&inc_snapshot), total)?; + self.send_receive(&snapshot.to_string(), &dest, Some(&inc_snapshot), total)?; } else { self.send_event(BackupEvent::StartingFullBackup { source: source.clone(), @@ -228,16 +246,13 @@ impl Job { total: self.datasets.len(), }); let snapshot = self.create_snapshot(source, JobSide::Source)?; - let total = self.estimate(&snapshot, None).ok(); + let total = self.estimate(&snapshot.to_string(), None).ok(); if !execute { self.send_event(BackupEvent::DryrunCompleted(source.clone())); - self.delete_snapshot(Snapshot { - snapshot: snapshot.clone(), - side: JobSide::Source, - })?; - return Ok(()); + self.delete_snapshot(snapshot)?; + continue; } - self.send_receive(&snapshot, &dest, None, total)?; + self.send_receive(&snapshot.to_string(), &dest, None, total)?; } // Clean up snapshots diff --git a/src/progress/filter.rs b/src/progress/filter.rs index 51260b3..311f984 100644 --- a/src/progress/filter.rs +++ b/src/progress/filter.rs @@ -4,7 +4,7 @@ use std::io::{self, Read}; use std::sync::mpsc::Sender; use std::time::{Duration, Instant}; -struct CountingReader { +struct ByteCountReader { inner: R, sender: Sender, bytes: u64, @@ -12,7 +12,7 @@ struct CountingReader { total: Option, } -impl Read for CountingReader { +impl Read for ByteCountReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { let n = self.inner.read(buf)?; self.bytes += n as u64; @@ -29,31 +29,31 @@ impl Read for CountingReader { } } -impl CountingReader { +impl ByteCountReader { fn new(inner: R, sender: Sender, total: Option) -> Self { Self { inner, sender, - total, bytes: 0, last_send: Instant::now() - Duration::from_secs(1), + total, } } } -pub struct CountingReaderBuilder { +pub(crate) struct ByteCountFilter { sender: Sender, total: Option, } -impl Filter for CountingReaderBuilder { - fn filter(&self, reader: Box) -> Box { - Box::new(CountingReader::new(reader, self.sender.clone(), self.total)) +impl ByteCountFilter { + pub(crate) fn new(sender: Sender, total: Option) -> Self { + Self { sender, total } } } -impl CountingReaderBuilder { - pub fn build(sender: Sender, total: Option) -> Box { - Box::new(Self { sender, total }) +impl Filter for ByteCountFilter { + fn filter(&self, inner: Box) -> Box { + Box::new(ByteCountReader::new(inner, self.sender.clone(), self.total)) } }