Tidy, rename and refactor a little
This commit is contained in:
105
src/job.rs
105
src/job.rs
@@ -3,6 +3,7 @@ use crate::progress::BackupEvent;
|
||||
use crate::progress::filter;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
pub struct Job {
|
||||
@@ -14,16 +15,24 @@ pub struct Job {
|
||||
sender: Option<Sender<BackupEvent>>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
enum JobSide {
|
||||
Source,
|
||||
Destination,
|
||||
Target,
|
||||
}
|
||||
|
||||
struct Snapshot {
|
||||
snapshot: String,
|
||||
dataset: String,
|
||||
snapshot_id: String,
|
||||
side: JobSide,
|
||||
}
|
||||
|
||||
impl fmt::Display for Snapshot {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}@{}", self.dataset, self.snapshot_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Job {
|
||||
fn send_event(&self, event: BackupEvent) {
|
||||
if let Some(sender) = &self.sender {
|
||||
@@ -34,16 +43,21 @@ impl Job {
|
||||
fn get_side_command(&self, side: JobSide) -> Vec<&str> {
|
||||
match side {
|
||||
JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
JobSide::Target => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<String, String> {
|
||||
let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S"));
|
||||
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<Snapshot, String> {
|
||||
let snapshot = Snapshot {
|
||||
dataset: dataset.to_string(),
|
||||
snapshot_id: Local::now().format("%Y-%m-%dT%H:%M:%S").to_string(),
|
||||
side,
|
||||
};
|
||||
let name = snapshot.to_string();
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["snapshot", &snapshot]);
|
||||
cmd.extend(["snapshot", &name]);
|
||||
command::exec_command(&cmd)?;
|
||||
self.send_event(BackupEvent::SnapshotCreated(snapshot.clone()));
|
||||
self.send_event(BackupEvent::SnapshotCreated(name.clone()));
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
@@ -68,6 +82,7 @@ impl Job {
|
||||
self.send_event(BackupEvent::Estimate(size));
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
fn send_receive(
|
||||
&self,
|
||||
source: &str,
|
||||
@@ -82,10 +97,13 @@ impl Job {
|
||||
}
|
||||
send_cmd.push(source);
|
||||
|
||||
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
||||
let mut receive_cmd = self.get_side_command(JobSide::Target);
|
||||
receive_cmd.extend(["receive", "-F", dest]);
|
||||
let filters = match self.sender.as_ref() {
|
||||
Some(sender) => vec![filter::CountingReaderBuilder::build(sender.clone(), total)],
|
||||
let filters: Vec<Box<dyn command::Filter>> = match self.sender.as_ref() {
|
||||
Some(sender) => vec![Box::new(filter::ByteCountFilter::new(
|
||||
sender.clone(),
|
||||
total,
|
||||
))],
|
||||
None => vec![],
|
||||
};
|
||||
command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?;
|
||||
@@ -93,7 +111,7 @@ impl Job {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_snapshots(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||
fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
@@ -116,16 +134,16 @@ impl Job {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result<String, String> {
|
||||
self.find_matching_snapshots(source, dest)?
|
||||
fn find_latest_matching_snapshot_id(&self, source: &str, dest: &str) -> Result<String, String> {
|
||||
self.find_matching_snapshot_ids(source, dest)?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(String::from("no matching snapshots"))
|
||||
}
|
||||
|
||||
fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
fn find_matching_snapshot_ids(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
||||
let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?;
|
||||
let mut dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?;
|
||||
dest_snapshots.sort_by(|a, b| b.cmp(a));
|
||||
|
||||
let source_set: HashSet<String> = source_snapshots.into_iter().collect();
|
||||
@@ -143,9 +161,9 @@ impl Job {
|
||||
}
|
||||
|
||||
fn find_old_snapshots(&self, source: &str, dest: &str) -> Result<Vec<Snapshot>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
let matching_snapshots = self.find_matching_snapshots(source, dest)?;
|
||||
let source_snapshots = self.list_snapshot_ids(source, JobSide::Source)?;
|
||||
let dest_snapshots = self.list_snapshot_ids(dest, JobSide::Target)?;
|
||||
let matching_snapshots = self.find_matching_snapshot_ids(source, dest)?;
|
||||
if matching_snapshots.is_empty() {
|
||||
return Err(String::from("no matching snapshots found"));
|
||||
}
|
||||
@@ -154,7 +172,8 @@ impl Job {
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", source, s),
|
||||
dataset: source.to_string(),
|
||||
snapshot_id: s,
|
||||
side: JobSide::Source,
|
||||
})
|
||||
.chain(
|
||||
@@ -162,22 +181,22 @@ impl Job {
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", dest, s),
|
||||
side: JobSide::Destination,
|
||||
dataset: dest.to_string(),
|
||||
snapshot_id: s,
|
||||
side: JobSide::Target,
|
||||
}),
|
||||
)
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<String, String> {
|
||||
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<(), String> {
|
||||
let mut cmd = self.get_side_command(snapshot.side);
|
||||
cmd.extend(["destroy", &snapshot.snapshot]);
|
||||
let res = command::exec_command(&cmd);
|
||||
if res.is_ok() {
|
||||
self.send_event(BackupEvent::SnapshotDeleted(snapshot.snapshot.clone()));
|
||||
}
|
||||
res
|
||||
let name = snapshot.to_string();
|
||||
cmd.extend(["destroy", &name]);
|
||||
command::exec_command(&cmd)?;
|
||||
self.send_event(BackupEvent::SnapshotDeleted(name.clone()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run(&self, execute: bool) -> Result<(), String> {
|
||||
@@ -191,7 +210,7 @@ impl Job {
|
||||
// TODO: This will assume the destination doesn't exist if the
|
||||
// command fails for any reason.
|
||||
let dest = format!("{}/{}", self.target, source);
|
||||
cmd = self.get_side_command(JobSide::Destination);
|
||||
cmd = self.get_side_command(JobSide::Target);
|
||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
||||
|
||||
@@ -206,20 +225,19 @@ impl Job {
|
||||
let inc_snapshot = format!(
|
||||
"{}@{}",
|
||||
source,
|
||||
self.find_latest_matching_snapshot(source, &dest)?
|
||||
self.find_latest_matching_snapshot_id(source, &dest)?
|
||||
);
|
||||
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
let total = self.estimate(&snapshot, Some(&inc_snapshot)).ok();
|
||||
let total = self
|
||||
.estimate(&snapshot.to_string(), Some(&inc_snapshot))
|
||||
.ok();
|
||||
if !execute {
|
||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
||||
self.delete_snapshot(Snapshot {
|
||||
snapshot: snapshot.clone(),
|
||||
side: JobSide::Source,
|
||||
})?;
|
||||
return Ok(());
|
||||
self.delete_snapshot(snapshot)?;
|
||||
continue;
|
||||
}
|
||||
self.send_receive(&snapshot, &dest, Some(&inc_snapshot), total)?;
|
||||
self.send_receive(&snapshot.to_string(), &dest, Some(&inc_snapshot), total)?;
|
||||
} else {
|
||||
self.send_event(BackupEvent::StartingFullBackup {
|
||||
source: source.clone(),
|
||||
@@ -228,16 +246,13 @@ impl Job {
|
||||
total: self.datasets.len(),
|
||||
});
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
let total = self.estimate(&snapshot, None).ok();
|
||||
let total = self.estimate(&snapshot.to_string(), None).ok();
|
||||
if !execute {
|
||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
||||
self.delete_snapshot(Snapshot {
|
||||
snapshot: snapshot.clone(),
|
||||
side: JobSide::Source,
|
||||
})?;
|
||||
return Ok(());
|
||||
self.delete_snapshot(snapshot)?;
|
||||
continue;
|
||||
}
|
||||
self.send_receive(&snapshot, &dest, None, total)?;
|
||||
self.send_receive(&snapshot.to_string(), &dest, None, total)?;
|
||||
}
|
||||
|
||||
// Clean up snapshots
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::io::{self, Read};
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct CountingReader<R: Read> {
|
||||
struct ByteCountReader<R: Read> {
|
||||
inner: R,
|
||||
sender: Sender<BackupEvent>,
|
||||
bytes: u64,
|
||||
@@ -12,7 +12,7 @@ struct CountingReader<R: Read> {
|
||||
total: Option<u64>,
|
||||
}
|
||||
|
||||
impl<R: Read> Read for CountingReader<R> {
|
||||
impl<R: Read> Read for ByteCountReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let n = self.inner.read(buf)?;
|
||||
self.bytes += n as u64;
|
||||
@@ -29,31 +29,31 @@ impl<R: Read> Read for CountingReader<R> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> CountingReader<R> {
|
||||
impl<R: Read> ByteCountReader<R> {
|
||||
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
sender,
|
||||
total,
|
||||
bytes: 0,
|
||||
last_send: Instant::now() - Duration::from_secs(1),
|
||||
total,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CountingReaderBuilder {
|
||||
pub(crate) struct ByteCountFilter {
|
||||
sender: Sender<BackupEvent>,
|
||||
total: Option<u64>,
|
||||
}
|
||||
|
||||
impl Filter for CountingReaderBuilder {
|
||||
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read> {
|
||||
Box::new(CountingReader::new(reader, self.sender.clone(), self.total))
|
||||
impl ByteCountFilter {
|
||||
pub(crate) fn new(sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||
Self { sender, total }
|
||||
}
|
||||
}
|
||||
|
||||
impl CountingReaderBuilder {
|
||||
pub fn build(sender: Sender<BackupEvent>, total: Option<u64>) -> Box<dyn Filter> {
|
||||
Box::new(Self { sender, total })
|
||||
impl Filter for ByteCountFilter {
|
||||
fn filter(&self, inner: Box<dyn Read>) -> Box<dyn Read> {
|
||||
Box::new(ByteCountReader::new(inner, self.sender.clone(), self.total))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user