Add progress reporting
This commit is contained in:
@@ -1,3 +1,9 @@
|
||||
use crate::progress::BackupEvent;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||
if command.is_empty() {
|
||||
return Err("Command is empty".to_string());
|
||||
@@ -19,7 +25,46 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
||||
Ok(output_str)
|
||||
}
|
||||
|
||||
pub fn exec_piped_commands(source: &Vec<&str>, dest: &Vec<&str>) -> Result<(), String> {
|
||||
struct CountingReader<R: Read> {
|
||||
inner: R,
|
||||
sender: Sender<BackupEvent>,
|
||||
bytes_read: u64,
|
||||
last_send: Instant,
|
||||
}
|
||||
|
||||
impl<R: Read> Read for CountingReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let n = self.inner.read(buf)?;
|
||||
self.bytes_read += n as u64;
|
||||
if self.last_send.elapsed().as_millis() >= 100 {
|
||||
self.sender
|
||||
.send(BackupEvent::BytesTransferred {
|
||||
dataset: String::from(""),
|
||||
bytes: self.bytes_read,
|
||||
})
|
||||
.ok();
|
||||
self.last_send = Instant::now();
|
||||
}
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> CountingReader<R> {
|
||||
fn new(inner: R, sender: Sender<BackupEvent>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
sender,
|
||||
bytes_read: 0,
|
||||
last_send: Instant::now() - Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exec_piped_commands(
|
||||
source: &Vec<&str>,
|
||||
dest: &Vec<&str>,
|
||||
sender: Option<Sender<BackupEvent>>,
|
||||
) -> Result<(), String> {
|
||||
if source.is_empty() || dest.is_empty() {
|
||||
return Err("Source or destination command is empty".to_string());
|
||||
}
|
||||
@@ -38,10 +83,19 @@ pub fn exec_piped_commands(source: &Vec<&str>, dest: &Vec<&str>) -> Result<(), S
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let mut receive_process = receive_cmd
|
||||
.stdin(send_process.stdout.take().unwrap())
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
let send_stdout = send_process.stdout.take().unwrap();
|
||||
let mut receive_stdin = receive_process.stdin.take().unwrap();
|
||||
|
||||
let mut reader: Box<dyn Read> = match sender {
|
||||
Some(s) => Box::new(CountingReader::new(send_stdout, s)),
|
||||
None => Box::new(send_stdout),
|
||||
};
|
||||
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
|
||||
|
||||
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
|
||||
let send_status = send_process.wait().map_err(|e| e.to_string())?;
|
||||
if !receive_status.success() {
|
||||
|
||||
338
src/job.rs
Normal file
338
src/job.rs
Normal file
@@ -0,0 +1,338 @@
|
||||
use crate::command;
|
||||
use crate::progress::BackupEvent;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::mpsc::Sender;
|
||||
|
||||
pub struct JobBuilder {
|
||||
sources: Vec<String>,
|
||||
target: String,
|
||||
source_zfs_command: Vec<String>,
|
||||
target_zfs_command: Vec<String>,
|
||||
dryrun: bool,
|
||||
retain: usize,
|
||||
sender: Option<Sender<BackupEvent>>,
|
||||
}
|
||||
|
||||
fn parse_command(commandstr: &str) -> Vec<String> {
|
||||
commandstr
|
||||
.split_whitespace()
|
||||
.map(|s| s.to_string())
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl JobBuilder {
|
||||
pub fn new(sources: Vec<String>, target: String) -> Self {
|
||||
JobBuilder {
|
||||
sources,
|
||||
target,
|
||||
source_zfs_command: vec!["zfs".to_string()],
|
||||
target_zfs_command: vec!["zfs".to_string()],
|
||||
dryrun: false,
|
||||
retain: 2,
|
||||
sender: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn source_zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.source_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn target_zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.target_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.source_zfs_command = command.clone();
|
||||
self.target_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn dryrun(mut self) -> Self {
|
||||
self.dryrun = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn retain(mut self, retain: usize) -> Self {
|
||||
self.retain = retain;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn sender(mut self, sender: Sender<BackupEvent>) -> Self {
|
||||
self.sender = Some(sender);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<Job, String> {
|
||||
let mut job = Job {
|
||||
datasets: vec![],
|
||||
target: self.target,
|
||||
source_zfs_command: self.source_zfs_command,
|
||||
target_zfs_command: self.target_zfs_command,
|
||||
dryrun: self.dryrun,
|
||||
retain: self.retain,
|
||||
sender: self.sender,
|
||||
};
|
||||
let mut datasets: Vec<String> = vec![];
|
||||
for source in &self.sources {
|
||||
let recurse = source.ends_with("/...");
|
||||
let source = source.trim_end_matches("/...");
|
||||
|
||||
let mut args = vec!["list", "-H", "-o", "name"];
|
||||
if recurse {
|
||||
args.push("-r");
|
||||
}
|
||||
args.push(source);
|
||||
|
||||
let mut cmd = job.get_side_command(JobSide::Source);
|
||||
cmd.extend(args);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
datasets.extend(output.lines().map(str::to_string));
|
||||
}
|
||||
if datasets.is_empty() {
|
||||
return Err(String::from("no matching source datasets found"));
|
||||
}
|
||||
job.datasets = datasets;
|
||||
Ok(job)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Job {
|
||||
datasets: Vec<String>,
|
||||
target: String,
|
||||
source_zfs_command: Vec<String>,
|
||||
target_zfs_command: Vec<String>,
|
||||
dryrun: bool,
|
||||
retain: usize,
|
||||
sender: Option<Sender<BackupEvent>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum JobSide {
|
||||
Source,
|
||||
Destination,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Snapshot {
|
||||
snapshot: String,
|
||||
side: JobSide,
|
||||
}
|
||||
|
||||
impl Job {
|
||||
pub fn dump(&self) {
|
||||
println!("Datasets: {:?}", self.datasets);
|
||||
println!("Target: {}", self.target);
|
||||
println!("Source ZFS Command: {:?}", self.source_zfs_command);
|
||||
println!("Target ZFS Command: {:?}", self.target_zfs_command);
|
||||
println!("Dryrun: {}", self.dryrun);
|
||||
}
|
||||
|
||||
fn send_event(&self, event: BackupEvent) {
|
||||
if let Some(sender) = &self.sender {
|
||||
sender.send(event).ok();
|
||||
}
|
||||
}
|
||||
|
||||
fn get_side_command(&self, side: JobSide) -> Vec<&str> {
|
||||
match side {
|
||||
JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<String, String> {
|
||||
let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S"));
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["snapshot", &snapshot]);
|
||||
command::exec_command(&cmd)?;
|
||||
self.send_event(BackupEvent::SnapshotCreated(snapshot.clone()));
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
fn estimate(&self, source: &str, inc_snapshot: Option<&str>) -> Result<u64, String> {
|
||||
let mut cmd = self.get_side_command(JobSide::Source);
|
||||
cmd.push("send");
|
||||
if let Some(inc) = inc_snapshot {
|
||||
cmd.extend(["-i", inc]);
|
||||
}
|
||||
cmd.extend(["-n", "-P"]);
|
||||
cmd.push(source);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let size = output
|
||||
.lines()
|
||||
.last()
|
||||
.ok_or("estimate parse error")?
|
||||
.split_whitespace()
|
||||
.last()
|
||||
.ok_or("estimate parse error")?
|
||||
.parse::<u64>()
|
||||
.map_err(|e| format!("estimate parse error: {}", e))?;
|
||||
self.send_event(BackupEvent::Estimate(size));
|
||||
Ok(size)
|
||||
}
|
||||
fn send_receive(
|
||||
&self,
|
||||
source: &str,
|
||||
dest: &str,
|
||||
inc_snapshot: Option<&str>,
|
||||
) -> Result<(), String> {
|
||||
let mut send_cmd = self.get_side_command(JobSide::Source);
|
||||
send_cmd.push("send");
|
||||
if let Some(inc) = inc_snapshot {
|
||||
send_cmd.extend(["-i", inc]);
|
||||
}
|
||||
send_cmd.push(source);
|
||||
|
||||
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
||||
receive_cmd.extend(["receive", "-F", dest]);
|
||||
command::exec_piped_commands(&send_cmd, &receive_cmd, self.sender.clone())?;
|
||||
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_snapshots(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let snapshots: Vec<&str> = output
|
||||
.split_whitespace()
|
||||
.map(|s| {
|
||||
let parts: Vec<&str> = s.split("@").collect();
|
||||
if parts.len() == 2 {
|
||||
Ok(parts[1])
|
||||
} else {
|
||||
Err(format!("invalid snapshot name: {}", s))
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let result: Vec<String> = snapshots
|
||||
.iter()
|
||||
.filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok())
|
||||
.map(|s| s.to_string())
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result<String, String> {
|
||||
self.find_matching_snapshots(source, dest)?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(String::from("no matching snapshots"))
|
||||
}
|
||||
|
||||
fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
dest_snapshots.sort_by(|a, b| b.cmp(a));
|
||||
|
||||
let source_set: HashSet<String> = source_snapshots.into_iter().collect();
|
||||
|
||||
let matching_snapshots: Vec<String> = dest_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| source_set.contains(s))
|
||||
.collect();
|
||||
|
||||
if matching_snapshots.is_empty() {
|
||||
Err(String::from("no matching snapshots"))
|
||||
} else {
|
||||
Ok(matching_snapshots)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_old_snapshots(&self, source: &str, dest: &str) -> Result<Vec<Snapshot>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
let matching_snapshots = self.find_matching_snapshots(source, dest)?;
|
||||
if matching_snapshots.is_empty() {
|
||||
return Err(String::from("no matching snapshots found"));
|
||||
}
|
||||
let retain: HashSet<String> = matching_snapshots.into_iter().take(self.retain).collect();
|
||||
let result = source_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", source, s),
|
||||
side: JobSide::Source,
|
||||
})
|
||||
.chain(
|
||||
dest_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", dest, s),
|
||||
side: JobSide::Destination,
|
||||
}),
|
||||
)
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<String, String> {
|
||||
let mut cmd = self.get_side_command(snapshot.side);
|
||||
cmd.extend(["destroy", &snapshot.snapshot]);
|
||||
let res = command::exec_command(&cmd);
|
||||
if res.is_ok() {
|
||||
self.send_event(BackupEvent::SnapshotDeleted(snapshot.snapshot.clone()));
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub fn run(&self) -> Result<(), String> {
|
||||
for (index, source) in self.datasets.iter().enumerate() {
|
||||
// Check the source exists
|
||||
let mut cmd = self.get_side_command(JobSide::Source);
|
||||
cmd.extend(["list", "-H", "-o", "name", source]);
|
||||
let _ = command::exec_command(&cmd)?;
|
||||
|
||||
// Check whether the destination exists
|
||||
// TODO: This will assume the destination doesn't exist if the
|
||||
// command fails for any reason.
|
||||
let dest = format!("{}/{}", self.target, source);
|
||||
cmd = self.get_side_command(JobSide::Destination);
|
||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
||||
|
||||
// Run backup
|
||||
if dest_exists {
|
||||
self.send_event(BackupEvent::StartingIncrementalBackup {
|
||||
source: source.clone(),
|
||||
dest: dest.clone(),
|
||||
index: index + 1,
|
||||
total: self.datasets.len(),
|
||||
});
|
||||
let inc_snapshot = format!(
|
||||
"{}@{}",
|
||||
source,
|
||||
self.find_latest_matching_snapshot(source, &dest)?
|
||||
);
|
||||
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
self.estimate(&snapshot, Some(&inc_snapshot)).ok();
|
||||
self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?;
|
||||
} else {
|
||||
self.send_event(BackupEvent::StartingFullBackup {
|
||||
source: source.clone(),
|
||||
dest: dest.clone(),
|
||||
index: index + 1,
|
||||
total: self.datasets.len(),
|
||||
});
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
self.estimate(&snapshot, None).ok();
|
||||
self.send_receive(&snapshot, &dest, None)?;
|
||||
}
|
||||
|
||||
// Clean up snapshots
|
||||
self.find_old_snapshots(source, &dest)?
|
||||
.into_iter()
|
||||
.map(|s| self.delete_snapshot(s))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
283
src/lib.rs
283
src/lib.rs
@@ -1,282 +1,3 @@
|
||||
mod command;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use std::collections::HashSet;
|
||||
|
||||
pub struct JobBuilder {
|
||||
sources: Vec<String>,
|
||||
target: String,
|
||||
source_zfs_command: Vec<String>,
|
||||
target_zfs_command: Vec<String>,
|
||||
dryrun: bool,
|
||||
retain: usize,
|
||||
}
|
||||
|
||||
fn parse_command(commandstr: &str) -> Vec<String> {
|
||||
commandstr
|
||||
.split_whitespace()
|
||||
.map(|s| s.to_string())
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl JobBuilder {
|
||||
pub fn new(sources: Vec<String>, target: String) -> Self {
|
||||
JobBuilder {
|
||||
sources,
|
||||
target,
|
||||
source_zfs_command: vec!["zfs".to_string()],
|
||||
target_zfs_command: vec!["zfs".to_string()],
|
||||
dryrun: false,
|
||||
retain: 2,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn source_zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.source_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn target_zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.target_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn zfs_command(mut self, commandstr: &str) -> Self {
|
||||
let command = parse_command(commandstr);
|
||||
self.source_zfs_command = command.clone();
|
||||
self.target_zfs_command = command;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn dryrun(mut self) -> Self {
|
||||
self.dryrun = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn retain(mut self, retain: usize) -> Self {
|
||||
self.retain = retain;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<Job, String> {
|
||||
let mut job = Job {
|
||||
datasets: vec![],
|
||||
target: self.target,
|
||||
source_zfs_command: self.source_zfs_command,
|
||||
target_zfs_command: self.target_zfs_command,
|
||||
dryrun: self.dryrun,
|
||||
retain: self.retain,
|
||||
};
|
||||
let mut datasets: Vec<String> = vec![];
|
||||
for source in &self.sources {
|
||||
let recurse = source.ends_with("/...");
|
||||
let source = source.trim_end_matches("/...");
|
||||
|
||||
let mut args = vec!["list", "-H", "-o", "name"];
|
||||
if recurse {
|
||||
args.push("-r");
|
||||
}
|
||||
args.push(source);
|
||||
|
||||
let mut cmd = job.get_side_command(JobSide::Source);
|
||||
cmd.extend(args);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
datasets.extend(output.lines().map(str::to_string));
|
||||
}
|
||||
job.datasets = datasets;
|
||||
Ok(job)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Job {
|
||||
datasets: Vec<String>,
|
||||
target: String,
|
||||
source_zfs_command: Vec<String>,
|
||||
target_zfs_command: Vec<String>,
|
||||
dryrun: bool,
|
||||
retain: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum JobSide {
|
||||
Source,
|
||||
Destination,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Snapshot {
|
||||
snapshot: String,
|
||||
side: JobSide,
|
||||
}
|
||||
|
||||
impl Job {
|
||||
pub fn dump(&self) {
|
||||
println!("Datasets: {:?}", self.datasets);
|
||||
println!("Target: {}", self.target);
|
||||
println!("Source ZFS Command: {:?}", self.source_zfs_command);
|
||||
println!("Target ZFS Command: {:?}", self.target_zfs_command);
|
||||
println!("Dryrun: {}", self.dryrun);
|
||||
}
|
||||
|
||||
fn get_side_command(&self, side: JobSide) -> Vec<&str> {
|
||||
match side {
|
||||
JobSide::Source => self.source_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
JobSide::Destination => self.target_zfs_command.iter().map(|s| s.as_str()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_snapshot(&self, dataset: &str, side: JobSide) -> Result<String, String> {
|
||||
let snapshot = format!("{}@{}", dataset, Local::now().format("%Y-%m-%dT%H:%M:%S"));
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["snapshot", &snapshot]);
|
||||
command::exec_command(&cmd)?;
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
fn send_receive(
|
||||
&self,
|
||||
source: &str,
|
||||
dest: &str,
|
||||
inc_snapshot: Option<&str>,
|
||||
) -> Result<(), String> {
|
||||
let mut send_cmd = self.get_side_command(JobSide::Source);
|
||||
send_cmd.push("send");
|
||||
if let Some(inc) = inc_snapshot {
|
||||
send_cmd.extend(["-i", inc]);
|
||||
}
|
||||
send_cmd.push(source);
|
||||
|
||||
let mut receive_cmd = self.get_side_command(JobSide::Destination);
|
||||
receive_cmd.extend(["receive", "-F", dest]);
|
||||
command::exec_piped_commands(&send_cmd, &receive_cmd)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_snapshots(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||
let mut cmd = self.get_side_command(side);
|
||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||
let output = command::exec_command(&cmd)?;
|
||||
let snapshots: Vec<&str> = output
|
||||
.split_whitespace()
|
||||
.map(|s| {
|
||||
let parts: Vec<&str> = s.split("@").collect();
|
||||
if parts.len() == 2 {
|
||||
Ok(parts[1])
|
||||
} else {
|
||||
Err(format!("invalid snapshot name: {}", s))
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let result: Vec<String> = snapshots
|
||||
.iter()
|
||||
.filter(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S").is_ok())
|
||||
.map(|s| s.to_string())
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn find_latest_matching_snapshot(&self, source: &str, dest: &str) -> Result<String, String> {
|
||||
self.find_matching_snapshots(source, dest)?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(String::from("no matching snapshots"))
|
||||
}
|
||||
|
||||
fn find_matching_snapshots(&self, source: &str, dest: &str) -> Result<Vec<String>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let mut dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
dest_snapshots.sort_by(|a, b| b.cmp(a));
|
||||
|
||||
let source_set: HashSet<String> = source_snapshots.into_iter().collect();
|
||||
|
||||
let matching_snapshots: Vec<String> = dest_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| source_set.contains(s))
|
||||
.collect();
|
||||
|
||||
if matching_snapshots.is_empty() {
|
||||
Err(String::from("no matching snapshots"))
|
||||
} else {
|
||||
Ok(matching_snapshots)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_old_snapshots(&self, source: &str, dest: &str) -> Result<Vec<Snapshot>, String> {
|
||||
let source_snapshots = self.list_snapshots(source, JobSide::Source)?;
|
||||
let dest_snapshots = self.list_snapshots(dest, JobSide::Destination)?;
|
||||
let matching_snapshots = self.find_matching_snapshots(source, dest)?;
|
||||
if matching_snapshots.is_empty() {
|
||||
return Err(String::from("no matching snapshots found"));
|
||||
}
|
||||
let retain: HashSet<String> = matching_snapshots.into_iter().take(self.retain).collect();
|
||||
let result = source_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", source, s),
|
||||
side: JobSide::Source,
|
||||
})
|
||||
.chain(
|
||||
dest_snapshots
|
||||
.into_iter()
|
||||
.filter(|s| !retain.contains(s))
|
||||
.map(|s| Snapshot {
|
||||
snapshot: format!("{}@{}", dest, s),
|
||||
side: JobSide::Destination,
|
||||
}),
|
||||
)
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn delete_snapshot(&self, snapshot: Snapshot) -> Result<String, String> {
|
||||
let mut cmd = self.get_side_command(snapshot.side);
|
||||
cmd.extend(["destroy", &snapshot.snapshot]);
|
||||
command::exec_command(&cmd)
|
||||
}
|
||||
|
||||
pub fn run(&self) -> Result<(), String> {
|
||||
for source in &self.datasets {
|
||||
// Check the source exists
|
||||
let mut cmd = self.get_side_command(JobSide::Source);
|
||||
cmd.extend(["list", "-H", "-o", "name", source]);
|
||||
let _ = command::exec_command(&cmd)?;
|
||||
|
||||
// Check whether the destination exists
|
||||
// TODO: This will assume the destination doesn't exist if the
|
||||
// command fails for any reason.
|
||||
let dest = format!("{}/{}", self.target, source);
|
||||
cmd = self.get_side_command(JobSide::Destination);
|
||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
||||
|
||||
// Run backup
|
||||
if dest_exists {
|
||||
println!("Incremental backup {} -> {}", source, dest);
|
||||
let inc_snapshot = format!(
|
||||
"{}@{}",
|
||||
source,
|
||||
self.find_latest_matching_snapshot(source, &dest)?
|
||||
);
|
||||
println!("Found matching snapshot: {}", inc_snapshot);
|
||||
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
self.send_receive(&snapshot, &dest, Some(&inc_snapshot))?;
|
||||
} else {
|
||||
println!("Full backup {} -> {}", source, dest);
|
||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||
self.send_receive(&snapshot, &dest, None)?;
|
||||
}
|
||||
|
||||
// Clean up snapshots
|
||||
let old_snapshots = self.find_old_snapshots(source, &dest)?;
|
||||
println!("Old snapshots: {:?}", old_snapshots);
|
||||
old_snapshots
|
||||
.into_iter()
|
||||
.map(|s| self.delete_snapshot(s))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
pub mod job;
|
||||
pub mod progress;
|
||||
|
||||
21
src/main.rs
21
src/main.rs
@@ -1,7 +1,10 @@
|
||||
use clap::Parser;
|
||||
use std::env::args;
|
||||
|
||||
use zfsbackup::JobBuilder;
|
||||
use std::error::Error;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::thread;
|
||||
use zfsbackup::job::JobBuilder;
|
||||
use zfsbackup::progress::ProgressReporter;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(version, about, long_about = None)]
|
||||
@@ -24,7 +27,7 @@ struct Args {
|
||||
datasets: Vec<String>,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
let args = Args::parse();
|
||||
let mut builder = JobBuilder::new(args.datasets, args.target);
|
||||
if args.dry_run {
|
||||
@@ -39,6 +42,14 @@ fn main() {
|
||||
if let Some(cmd) = args.target_zfs_command {
|
||||
builder = builder.target_zfs_command(&cmd);
|
||||
}
|
||||
let job = builder.build().expect("asplode");
|
||||
job.run().expect("boom");
|
||||
|
||||
let (tx, rx) = channel();
|
||||
let mut pr = ProgressReporter::new(rx);
|
||||
thread::spawn(move || pr.run());
|
||||
|
||||
builder = builder.sender(tx);
|
||||
|
||||
let job = builder.build()?;
|
||||
job.run()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
95
src/progress.rs
Normal file
95
src/progress.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use std::{io::Write, sync::mpsc::Receiver};
|
||||
|
||||
pub enum BackupEvent {
|
||||
Estimate(u64),
|
||||
StartingFullBackup {
|
||||
source: String,
|
||||
dest: String,
|
||||
index: usize,
|
||||
total: usize,
|
||||
},
|
||||
StartingIncrementalBackup {
|
||||
source: String,
|
||||
dest: String,
|
||||
index: usize,
|
||||
total: usize,
|
||||
},
|
||||
SnapshotCreated(String),
|
||||
SnapshotDeleted(String),
|
||||
BytesTransferred {
|
||||
dataset: String,
|
||||
bytes: u64,
|
||||
},
|
||||
DatasetCompleted(String),
|
||||
}
|
||||
|
||||
pub struct ProgressReporter {
|
||||
receiver: Receiver<BackupEvent>,
|
||||
estimated_size: u64,
|
||||
}
|
||||
|
||||
impl ProgressReporter {
|
||||
pub fn new(receiver: Receiver<BackupEvent>) -> Self {
|
||||
Self {
|
||||
receiver,
|
||||
estimated_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(&mut self) {
|
||||
while let Ok(event) = self.receiver.recv() {
|
||||
match event {
|
||||
BackupEvent::Estimate(size) => {
|
||||
println!("Estimated total backup size: {} bytes", size);
|
||||
self.estimated_size = size;
|
||||
}
|
||||
BackupEvent::StartingFullBackup {
|
||||
source,
|
||||
dest,
|
||||
index,
|
||||
total,
|
||||
} => {
|
||||
println!(
|
||||
"Starting full backup of {} to {} ({} of {})",
|
||||
source, dest, index, total
|
||||
);
|
||||
}
|
||||
BackupEvent::StartingIncrementalBackup {
|
||||
source,
|
||||
dest,
|
||||
index,
|
||||
total,
|
||||
} => {
|
||||
println!(
|
||||
"Starting incremental backup of {} to {} ({} of {})",
|
||||
source, dest, index, total
|
||||
);
|
||||
}
|
||||
BackupEvent::SnapshotCreated(name) => {
|
||||
println!("Created snapshot: {}", name);
|
||||
}
|
||||
BackupEvent::SnapshotDeleted(name) => {
|
||||
println!("Deleted snapshot: {}", name);
|
||||
}
|
||||
BackupEvent::BytesTransferred { dataset, bytes } => {
|
||||
let percent: f64 = if self.estimated_size > 0 {
|
||||
bytes as f64 / self.estimated_size as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
print!(
|
||||
"{:>3.0}% {}/{} bytes transferred\r",
|
||||
percent * 100.0,
|
||||
bytes,
|
||||
self.estimated_size
|
||||
);
|
||||
std::io::stdout().flush().ok();
|
||||
}
|
||||
BackupEvent::DatasetCompleted(name) => {
|
||||
println!("Completed backup of dataset: {}", name);
|
||||
self.estimated_size = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user