Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 05abd729a8 | |||
| 3251a97e1a | |||
| 0e52fd8fde | |||
| 201a46c322 | |||
| d18a828195 | |||
| a0585764cf | |||
| e8e4e3636c | |||
| 2660d7d2f5 | |||
| 9ef27716c6 |
Generated
+1
-1
@@ -435,7 +435,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "zfsbackup"
|
name = "zfsbackup"
|
||||||
version = "0.3.0"
|
version = "0.5.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "zfsbackup"
|
name = "zfsbackup"
|
||||||
version = "0.3.0"
|
version = "0.5.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
|
|||||||
+32
-17
@@ -6,7 +6,12 @@ pub trait Filter {
|
|||||||
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
fn filter(&self, reader: Box<dyn Read>) -> Box<dyn Read>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
// TODO: Make this support quoting for args with spaces (and escape nested quotes)
|
||||||
|
fn display_command(command: &Vec<&str>) -> String {
|
||||||
|
command.join(" ")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn exec(command: &Vec<&str>) -> Result<String, String> {
|
||||||
if command.is_empty() {
|
if command.is_empty() {
|
||||||
return Err("Command is empty".to_string());
|
return Err("Command is empty".to_string());
|
||||||
}
|
}
|
||||||
@@ -14,11 +19,13 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
|||||||
if command.len() > 1 {
|
if command.len() > 1 {
|
||||||
cmd.args(&command[1..]);
|
cmd.args(&command[1..]);
|
||||||
}
|
}
|
||||||
let output = cmd.output().map_err(|e| e.to_string())?;
|
let output = cmd
|
||||||
|
.output()
|
||||||
|
.map_err(|e| format!("Failed to run command {}: {}", command[0], e))?;
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
"Command {:?} failed with status {}: {}",
|
"Command '{}' failed with status {}: {}",
|
||||||
command,
|
display_command(command),
|
||||||
output.status,
|
output.status,
|
||||||
String::from_utf8_lossy(&output.stderr)
|
String::from_utf8_lossy(&output.stderr)
|
||||||
));
|
));
|
||||||
@@ -27,7 +34,10 @@ pub fn exec_command(command: &Vec<&str>) -> Result<String, String> {
|
|||||||
Ok(output_str)
|
Ok(output_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exec_piped_commands(
|
/// Executes a pipeline of commands, where the output of the source command is passed through a
|
||||||
|
/// series of filters before being piped into the destination command. The filters are applied in
|
||||||
|
/// the order they are provided.
|
||||||
|
pub fn exec_pipe(
|
||||||
source: &Vec<&str>,
|
source: &Vec<&str>,
|
||||||
dest: &Vec<&str>,
|
dest: &Vec<&str>,
|
||||||
filters: Vec<Box<dyn Filter>>,
|
filters: Vec<Box<dyn Filter>>,
|
||||||
@@ -57,15 +67,26 @@ pub fn exec_piped_commands(
|
|||||||
let send_stdout = send_process.stdout.take().unwrap();
|
let send_stdout = send_process.stdout.take().unwrap();
|
||||||
let mut receive_stdin = receive_process.stdin.take().unwrap();
|
let mut receive_stdin = receive_process.stdin.take().unwrap();
|
||||||
|
|
||||||
let mut reader: Box<dyn Read> = Box::new(send_stdout);
|
{
|
||||||
for filter in filters {
|
let mut reader: Box<dyn Read> = Box::new(send_stdout);
|
||||||
reader = filter.filter(reader);
|
for filter in filters {
|
||||||
|
reader = filter.filter(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::io::copy(&mut reader, &mut receive_stdin).map_err(|e| e.to_string())?;
|
// The send process will typically finish first. If it failed, terminate the
|
||||||
|
// receiver.
|
||||||
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
|
|
||||||
let send_status = send_process.wait().map_err(|e| e.to_string())?;
|
let send_status = send_process.wait().map_err(|e| e.to_string())?;
|
||||||
|
if !send_status.success() {
|
||||||
|
receive_process.kill().ok();
|
||||||
|
return Err(format!(
|
||||||
|
"Send command {:?} failed with status {}",
|
||||||
|
source, send_status
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let receive_status = receive_process.wait().map_err(|e| e.to_string())?;
|
||||||
if !receive_status.success() {
|
if !receive_status.success() {
|
||||||
send_process.kill().ok();
|
send_process.kill().ok();
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
@@ -73,11 +94,5 @@ pub fn exec_piped_commands(
|
|||||||
dest, receive_status
|
dest, receive_status
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if !send_status.success() {
|
|
||||||
return Err(format!(
|
|
||||||
"Send command {:?} failed with status {}",
|
|
||||||
source, send_status
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
+15
-15
@@ -57,8 +57,8 @@ impl Job {
|
|||||||
let name = snapshot.to_string();
|
let name = snapshot.to_string();
|
||||||
let mut cmd = self.get_side_command(side);
|
let mut cmd = self.get_side_command(side);
|
||||||
cmd.extend(["snapshot", &name]);
|
cmd.extend(["snapshot", &name]);
|
||||||
command::exec_command(&cmd)?;
|
command::exec(&cmd)?;
|
||||||
self.send_event(BackupEvent::SnapshotCreated(name.clone()));
|
self.send_event(BackupEvent::SnapshotCreate(name.clone()));
|
||||||
Ok(snapshot)
|
Ok(snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ impl Job {
|
|||||||
}
|
}
|
||||||
cmd.extend(["-n", "-P"]);
|
cmd.extend(["-n", "-P"]);
|
||||||
cmd.push(source);
|
cmd.push(source);
|
||||||
let output = command::exec_command(&cmd)?;
|
let output = command::exec(&cmd)?;
|
||||||
let size = output
|
let size = output
|
||||||
.lines()
|
.lines()
|
||||||
.last()
|
.last()
|
||||||
@@ -107,15 +107,15 @@ impl Job {
|
|||||||
))],
|
))],
|
||||||
None => vec![],
|
None => vec![],
|
||||||
};
|
};
|
||||||
command::exec_piped_commands(&send_cmd, &receive_cmd, filters)?;
|
command::exec_pipe(&send_cmd, &receive_cmd, filters)?;
|
||||||
self.send_event(BackupEvent::DatasetCompleted(source.to_string()));
|
self.send_event(BackupEvent::DatasetComplete(source.to_string()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
fn list_snapshot_ids(&self, source: &str, side: JobSide) -> Result<Vec<String>, String> {
|
||||||
let mut cmd = self.get_side_command(side);
|
let mut cmd = self.get_side_command(side);
|
||||||
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
cmd.extend(["list", "-H", "-o", "name", "-t", "snapshot", source]);
|
||||||
let output = command::exec_command(&cmd)?;
|
let output = command::exec(&cmd)?;
|
||||||
let snapshots: Vec<&str> = output
|
let snapshots: Vec<&str> = output
|
||||||
.split_whitespace()
|
.split_whitespace()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
@@ -195,8 +195,8 @@ impl Job {
|
|||||||
let mut cmd = self.get_side_command(snapshot.side);
|
let mut cmd = self.get_side_command(snapshot.side);
|
||||||
let name = snapshot.to_string();
|
let name = snapshot.to_string();
|
||||||
cmd.extend(["destroy", &name]);
|
cmd.extend(["destroy", &name]);
|
||||||
command::exec_command(&cmd)?;
|
command::exec(&cmd)?;
|
||||||
self.send_event(BackupEvent::SnapshotDeleted(name.clone()));
|
self.send_event(BackupEvent::SnapshotDelete(name.clone()));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,7 +205,7 @@ impl Job {
|
|||||||
// Check the source exists
|
// Check the source exists
|
||||||
let mut cmd = self.get_side_command(JobSide::Source);
|
let mut cmd = self.get_side_command(JobSide::Source);
|
||||||
cmd.extend(["list", "-H", "-o", "name", source]);
|
cmd.extend(["list", "-H", "-o", "name", source]);
|
||||||
let _ = command::exec_command(&cmd)?;
|
let _ = command::exec(&cmd)?;
|
||||||
|
|
||||||
// Check whether the destination exists
|
// Check whether the destination exists
|
||||||
// TODO: This will assume the destination doesn't exist if the
|
// TODO: This will assume the destination doesn't exist if the
|
||||||
@@ -213,11 +213,11 @@ impl Job {
|
|||||||
let dest = format!("{}/{}", self.target, source);
|
let dest = format!("{}/{}", self.target, source);
|
||||||
cmd = self.get_side_command(JobSide::Target);
|
cmd = self.get_side_command(JobSide::Target);
|
||||||
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
cmd.extend(["list", "-H", "-o", "name", &dest]);
|
||||||
let dest_exists = command::exec_command(&cmd).is_ok();
|
let dest_exists = command::exec(&cmd).is_ok();
|
||||||
|
|
||||||
// Run backup
|
// Run backup
|
||||||
if dest_exists {
|
if dest_exists {
|
||||||
self.send_event(BackupEvent::StartingIncrementalBackup {
|
self.send_event(BackupEvent::IncrementalBackupStart {
|
||||||
source: source.clone(),
|
source: source.clone(),
|
||||||
dest: dest.clone(),
|
dest: dest.clone(),
|
||||||
index: index + 1,
|
index: index + 1,
|
||||||
@@ -234,13 +234,13 @@ impl Job {
|
|||||||
.estimate(&snapshot.to_string(), Some(&inc_snapshot))
|
.estimate(&snapshot.to_string(), Some(&inc_snapshot))
|
||||||
.ok();
|
.ok();
|
||||||
if !execute {
|
if !execute {
|
||||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
self.send_event(BackupEvent::DryrunComplete(source.clone()));
|
||||||
self.delete_snapshot(snapshot)?;
|
self.delete_snapshot(snapshot)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
self.send_receive(&snapshot.to_string(), &dest, Some(&inc_snapshot), total)?;
|
self.send_receive(&snapshot.to_string(), &dest, Some(&inc_snapshot), total)?;
|
||||||
} else {
|
} else {
|
||||||
self.send_event(BackupEvent::StartingFullBackup {
|
self.send_event(BackupEvent::FullBackupStart {
|
||||||
source: source.clone(),
|
source: source.clone(),
|
||||||
dest: dest.clone(),
|
dest: dest.clone(),
|
||||||
index: index + 1,
|
index: index + 1,
|
||||||
@@ -249,7 +249,7 @@ impl Job {
|
|||||||
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
let snapshot = self.create_snapshot(source, JobSide::Source)?;
|
||||||
let total = self.estimate(&snapshot.to_string(), None).ok();
|
let total = self.estimate(&snapshot.to_string(), None).ok();
|
||||||
if !execute {
|
if !execute {
|
||||||
self.send_event(BackupEvent::DryrunCompleted(source.clone()));
|
self.send_event(BackupEvent::DryrunComplete(source.clone()));
|
||||||
self.delete_snapshot(snapshot)?;
|
self.delete_snapshot(snapshot)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -375,7 +375,7 @@ impl JobBuilder {
|
|||||||
|
|
||||||
let mut cmd: Vec<&str> = self.source_zfs_command.iter().map(|s| s.as_str()).collect();
|
let mut cmd: Vec<&str> = self.source_zfs_command.iter().map(|s| s.as_str()).collect();
|
||||||
cmd.extend(args);
|
cmd.extend(args);
|
||||||
let output = command::exec_command(&cmd)?;
|
let output = command::exec(&cmd)?;
|
||||||
datasets.extend(output.lines().map(str::to_string));
|
datasets.extend(output.lines().map(str::to_string));
|
||||||
}
|
}
|
||||||
if datasets.is_empty() {
|
if datasets.is_empty() {
|
||||||
|
|||||||
+21
-5
@@ -5,7 +5,7 @@ use std::io::IsTerminal;
|
|||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use zfsbackup::job::JobBuilder;
|
use zfsbackup::job::JobBuilder;
|
||||||
use zfsbackup::progress::{Progressor, log, terminal};
|
use zfsbackup::progress::{Progressor, curt, log, terminal};
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(version, about, long_about = None)]
|
#[command(version, about, long_about = None)]
|
||||||
@@ -29,6 +29,9 @@ struct Args {
|
|||||||
#[arg(short, long)]
|
#[arg(short, long)]
|
||||||
retain: Option<usize>,
|
retain: Option<usize>,
|
||||||
|
|
||||||
|
#[arg(short, long)]
|
||||||
|
progressor: Option<String>,
|
||||||
|
|
||||||
datasets: Vec<String>,
|
datasets: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,10 +52,23 @@ fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
let mut pr: Box<dyn Progressor> = if std::io::stdout().is_terminal() {
|
let mut pr: Box<dyn Progressor> = match args.progressor {
|
||||||
Box::new(terminal::Progressor::new(rx))
|
Some(name) => match name.as_str() {
|
||||||
} else {
|
"curt" => Box::new(curt::Progressor::new(rx)),
|
||||||
Box::new(log::Progressor::new(rx))
|
"terminal" => Box::new(terminal::Progressor::new(rx)),
|
||||||
|
"log" => Box::new(log::Progressor::new(rx)),
|
||||||
|
_ => {
|
||||||
|
eprintln!("progressor {} not found, defaulting to 'log'", name);
|
||||||
|
Box::new(log::Progressor::new(rx))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
if std::io::stdout().is_terminal() {
|
||||||
|
Box::new(terminal::Progressor::new(rx))
|
||||||
|
} else {
|
||||||
|
Box::new(log::Progressor::new(rx))
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let handle = thread::spawn(move || pr.run());
|
let handle = thread::spawn(move || pr.run());
|
||||||
builder = builder.sender(tx);
|
builder = builder.sender(tx);
|
||||||
|
|||||||
@@ -0,0 +1,84 @@
|
|||||||
|
use super::{BackupEvent, human_bytes};
|
||||||
|
use std::io::Write;
|
||||||
|
use std::sync::mpsc::Receiver;
|
||||||
|
|
||||||
|
pub struct Progressor {
|
||||||
|
receiver: Receiver<BackupEvent>,
|
||||||
|
source: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl super::Progressor for Progressor {
|
||||||
|
fn run(&mut self) {
|
||||||
|
while let Ok(event) = self.receiver.recv() {
|
||||||
|
match event {
|
||||||
|
BackupEvent::Estimate(_) => {}
|
||||||
|
BackupEvent::FullBackupStart {
|
||||||
|
source,
|
||||||
|
dest: _,
|
||||||
|
index: _,
|
||||||
|
total: _,
|
||||||
|
} => {
|
||||||
|
self.source = source;
|
||||||
|
print!("{}\r", self.source);
|
||||||
|
}
|
||||||
|
BackupEvent::IncrementalBackupStart {
|
||||||
|
source,
|
||||||
|
dest: _,
|
||||||
|
index: _,
|
||||||
|
total: _,
|
||||||
|
} => {
|
||||||
|
self.source = source;
|
||||||
|
print!("{}\r", self.source);
|
||||||
|
}
|
||||||
|
BackupEvent::SnapshotCreate(_) => {}
|
||||||
|
BackupEvent::SnapshotDelete(_) => {}
|
||||||
|
BackupEvent::BytesTransferred {
|
||||||
|
bytes,
|
||||||
|
estimated_total,
|
||||||
|
} => {
|
||||||
|
match estimated_total {
|
||||||
|
Some(total) => {
|
||||||
|
let percent: f64 = if total > 0 {
|
||||||
|
bytes as f64 / total as f64
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
};
|
||||||
|
|
||||||
|
print!("\x1b[2K{}: {:>3.0}%\r", self.source, percent * 100.0,);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
print!(
|
||||||
|
"\x1b[2K{}: {} transferred\r",
|
||||||
|
self.source,
|
||||||
|
human_bytes(bytes as f64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::io::stdout().flush().ok();
|
||||||
|
}
|
||||||
|
BackupEvent::SendComplete(bytes) => {
|
||||||
|
print!(
|
||||||
|
"\x1b[2K{}: send complete ({})\r",
|
||||||
|
self.source,
|
||||||
|
human_bytes(bytes as f64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
BackupEvent::DatasetComplete(_) => {
|
||||||
|
println!("\x1b[2K{}: complete", self.source);
|
||||||
|
}
|
||||||
|
BackupEvent::DryrunComplete(_) => {
|
||||||
|
println!("\x1b[2K{}: complete", self.source);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Progressor {
|
||||||
|
pub fn new(receiver: Receiver<BackupEvent>) -> Self {
|
||||||
|
Self {
|
||||||
|
receiver,
|
||||||
|
source: String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,6 +29,12 @@ impl<R: Read> Read for ByteCountReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<R: Read> Drop for ByteCountReader<R> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.sender.send(BackupEvent::SendComplete(self.bytes)).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<R: Read> ByteCountReader<R> {
|
impl<R: Read> ByteCountReader<R> {
|
||||||
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
fn new(inner: R, sender: Sender<BackupEvent>, total: Option<u64>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
+9
-6
@@ -14,7 +14,7 @@ impl super::Progressor for Progressor {
|
|||||||
println!("Estimated total backup size: {} bytes", size);
|
println!("Estimated total backup size: {} bytes", size);
|
||||||
self.estimated_size = size;
|
self.estimated_size = size;
|
||||||
}
|
}
|
||||||
BackupEvent::StartingFullBackup {
|
BackupEvent::FullBackupStart {
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
index,
|
index,
|
||||||
@@ -25,7 +25,7 @@ impl super::Progressor for Progressor {
|
|||||||
source, dest, index, total
|
source, dest, index, total
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BackupEvent::StartingIncrementalBackup {
|
BackupEvent::IncrementalBackupStart {
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
index,
|
index,
|
||||||
@@ -36,18 +36,21 @@ impl super::Progressor for Progressor {
|
|||||||
source, dest, index, total
|
source, dest, index, total
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BackupEvent::SnapshotCreated(name) => {
|
BackupEvent::SnapshotCreate(name) => {
|
||||||
println!("Created snapshot: {}", name);
|
println!("Created snapshot: {}", name);
|
||||||
}
|
}
|
||||||
BackupEvent::SnapshotDeleted(name) => {
|
BackupEvent::SnapshotDelete(name) => {
|
||||||
println!("Deleted snapshot: {}", name);
|
println!("Deleted snapshot: {}", name);
|
||||||
}
|
}
|
||||||
BackupEvent::BytesTransferred { .. } => {}
|
BackupEvent::BytesTransferred { .. } => {}
|
||||||
BackupEvent::DatasetCompleted(name) => {
|
BackupEvent::SendComplete(bytes) => {
|
||||||
|
println!("Send completed: {} bytes", bytes);
|
||||||
|
}
|
||||||
|
BackupEvent::DatasetComplete(name) => {
|
||||||
println!("Completed backup of dataset: {}", name);
|
println!("Completed backup of dataset: {}", name);
|
||||||
self.estimated_size = 0;
|
self.estimated_size = 0;
|
||||||
}
|
}
|
||||||
BackupEvent::DryrunCompleted(name) => {
|
BackupEvent::DryrunComplete(name) => {
|
||||||
println!("Completed dry run backup of dataset: {}", name);
|
println!("Completed dry run backup of dataset: {}", name);
|
||||||
self.estimated_size = 0;
|
self.estimated_size = 0;
|
||||||
}
|
}
|
||||||
|
|||||||
+24
-13
@@ -1,43 +1,54 @@
|
|||||||
|
pub mod curt;
|
||||||
pub mod filter;
|
pub mod filter;
|
||||||
pub mod log;
|
pub mod log;
|
||||||
|
mod rate;
|
||||||
pub mod terminal;
|
pub mod terminal;
|
||||||
|
|
||||||
pub trait Progressor: Send {
|
pub trait Progressor: Send {
|
||||||
fn run(&mut self);
|
fn run(&mut self);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn human_bytes(bytes: u64) -> String {
|
fn human_bytes(bytes: f64) -> String {
|
||||||
let units = ["B", "kB", "MB", "GB", "TB"];
|
let units = ["B", "kB", "MB", "GB", "TB"];
|
||||||
|
|
||||||
let mut value = bytes as f64;
|
let mut a = bytes;
|
||||||
let mut index = 0;
|
let mut i = 0;
|
||||||
while value >= 1024.0 && index < units.len() - 1 {
|
while a >= 1024.0 && i < units.len() - 1 {
|
||||||
value /= 1024.0;
|
a /= 1024.0;
|
||||||
index += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
format!("{:.2} {:2}", value, units[index])
|
format!("{:.2} {}", a, units[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_eta(seconds: f64) -> String {
|
||||||
|
let secs = seconds as u64;
|
||||||
|
let h = secs / 3600;
|
||||||
|
let m = (secs % 3600) / 60;
|
||||||
|
let s = secs % 60;
|
||||||
|
format!("{:02}:{:02}:{:02}", h, m, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum BackupEvent {
|
pub enum BackupEvent {
|
||||||
Estimate(u64),
|
Estimate(u64),
|
||||||
StartingFullBackup {
|
FullBackupStart {
|
||||||
source: String,
|
source: String,
|
||||||
dest: String,
|
dest: String,
|
||||||
index: usize,
|
index: usize,
|
||||||
total: usize,
|
total: usize,
|
||||||
},
|
},
|
||||||
StartingIncrementalBackup {
|
IncrementalBackupStart {
|
||||||
source: String,
|
source: String,
|
||||||
dest: String,
|
dest: String,
|
||||||
index: usize,
|
index: usize,
|
||||||
total: usize,
|
total: usize,
|
||||||
},
|
},
|
||||||
SnapshotCreated(String),
|
SnapshotCreate(String),
|
||||||
SnapshotDeleted(String),
|
SnapshotDelete(String),
|
||||||
BytesTransferred {
|
BytesTransferred {
|
||||||
bytes: u64,
|
bytes: u64,
|
||||||
estimated_total: Option<u64>,
|
estimated_total: Option<u64>,
|
||||||
},
|
},
|
||||||
DatasetCompleted(String),
|
SendComplete(u64),
|
||||||
DryrunCompleted(String),
|
DatasetComplete(String),
|
||||||
|
DryrunComplete(String),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,74 @@
|
|||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct Sample {
|
||||||
|
at: Instant,
|
||||||
|
bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct RateTracker {
|
||||||
|
samples: Vec<Sample>,
|
||||||
|
head: usize,
|
||||||
|
count: usize,
|
||||||
|
smoothed: f64,
|
||||||
|
alpha: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RateTracker {
|
||||||
|
pub(crate) fn new(capacity: usize) -> Self {
|
||||||
|
RateTracker {
|
||||||
|
samples: vec![
|
||||||
|
Sample {
|
||||||
|
at: Instant::now(),
|
||||||
|
bytes: 0,
|
||||||
|
};
|
||||||
|
capacity
|
||||||
|
],
|
||||||
|
head: 0,
|
||||||
|
count: 0,
|
||||||
|
smoothed: 0.0,
|
||||||
|
alpha: 0.05,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn push(&mut self, bytes: u64) {
|
||||||
|
let sample = Sample {
|
||||||
|
at: Instant::now(),
|
||||||
|
bytes,
|
||||||
|
};
|
||||||
|
self.samples[self.head] = sample;
|
||||||
|
self.head = (self.head + 1) % self.samples.capacity();
|
||||||
|
self.count = (self.count + 1).min(self.samples.capacity())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn rate(&mut self) -> Option<f64> {
|
||||||
|
if self.count < 2 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cap = self.samples.capacity();
|
||||||
|
|
||||||
|
let oldest_idx = (cap + self.head - self.count) % cap;
|
||||||
|
let newest_idx = (cap + self.head - 1) % cap;
|
||||||
|
|
||||||
|
let oldest = self.samples[oldest_idx];
|
||||||
|
let newest = self.samples[newest_idx];
|
||||||
|
|
||||||
|
let elapsed = newest.at.duration_since(oldest.at).as_secs_f64();
|
||||||
|
if elapsed == 0.0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If for any reason oldest < newest, the subtraction will overflow
|
||||||
|
if newest.bytes < oldest.bytes {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let rate = (newest.bytes - oldest.bytes) as f64 / elapsed;
|
||||||
|
|
||||||
|
// exponentially weighted moving average to smooth those spikes
|
||||||
|
self.smoothed = self.alpha * rate + (1.0 - self.alpha) * self.smoothed;
|
||||||
|
|
||||||
|
Some(self.smoothed)
|
||||||
|
}
|
||||||
|
}
|
||||||
+43
-13
@@ -1,9 +1,13 @@
|
|||||||
|
use crate::progress::format_eta;
|
||||||
|
|
||||||
|
use super::rate::RateTracker;
|
||||||
use super::{BackupEvent, human_bytes};
|
use super::{BackupEvent, human_bytes};
|
||||||
use std::{io::Write, sync::mpsc::Receiver};
|
use std::{io::Write, sync::mpsc::Receiver};
|
||||||
|
|
||||||
pub struct Progressor {
|
pub struct Progressor {
|
||||||
receiver: Receiver<BackupEvent>,
|
receiver: Receiver<BackupEvent>,
|
||||||
estimated_size: u64,
|
estimated_size: u64,
|
||||||
|
rate_tracker: RateTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl super::Progressor for Progressor {
|
impl super::Progressor for Progressor {
|
||||||
@@ -11,10 +15,10 @@ impl super::Progressor for Progressor {
|
|||||||
while let Ok(event) = self.receiver.recv() {
|
while let Ok(event) = self.receiver.recv() {
|
||||||
match event {
|
match event {
|
||||||
BackupEvent::Estimate(size) => {
|
BackupEvent::Estimate(size) => {
|
||||||
println!("Estimated total backup size: {}", human_bytes(size));
|
println!("Estimated total backup size: {}", human_bytes(size as f64));
|
||||||
self.estimated_size = size;
|
self.estimated_size = size;
|
||||||
}
|
}
|
||||||
BackupEvent::StartingFullBackup {
|
BackupEvent::FullBackupStart {
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
index,
|
index,
|
||||||
@@ -25,7 +29,7 @@ impl super::Progressor for Progressor {
|
|||||||
source, dest, index, total
|
source, dest, index, total
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BackupEvent::StartingIncrementalBackup {
|
BackupEvent::IncrementalBackupStart {
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
index,
|
index,
|
||||||
@@ -36,10 +40,10 @@ impl super::Progressor for Progressor {
|
|||||||
source, dest, index, total
|
source, dest, index, total
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BackupEvent::SnapshotCreated(name) => {
|
BackupEvent::SnapshotCreate(name) => {
|
||||||
println!("Created snapshot: {}", name);
|
println!("Created snapshot: {}", name);
|
||||||
}
|
}
|
||||||
BackupEvent::SnapshotDeleted(name) => {
|
BackupEvent::SnapshotDelete(name) => {
|
||||||
println!("Deleted snapshot: {}", name);
|
println!("Deleted snapshot: {}", name);
|
||||||
}
|
}
|
||||||
BackupEvent::BytesTransferred {
|
BackupEvent::BytesTransferred {
|
||||||
@@ -53,24 +57,49 @@ impl super::Progressor for Progressor {
|
|||||||
} else {
|
} else {
|
||||||
0.0
|
0.0
|
||||||
};
|
};
|
||||||
|
self.rate_tracker.push(bytes);
|
||||||
|
let rate = self.rate_tracker.rate();
|
||||||
|
let rate_display = match rate {
|
||||||
|
None => String::from("[unknown]"),
|
||||||
|
Some(0.0) => {
|
||||||
|
String::from("[stalled] ETA heat death of the universe")
|
||||||
|
}
|
||||||
|
Some(rate) => {
|
||||||
|
let eta = if bytes > total {
|
||||||
|
String::from("any moment now")
|
||||||
|
} else {
|
||||||
|
let eta = (total - bytes) as f64 / rate;
|
||||||
|
format_eta(eta)
|
||||||
|
};
|
||||||
|
format!("{}/s ETA {}", human_bytes(rate), eta)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
print!(
|
print!(
|
||||||
"{:>3.0}% {}/{} transferred\r",
|
"\x1b[2K{:>3.0}% {}/{} @ {}\r",
|
||||||
percent * 100.0,
|
percent * 100.0,
|
||||||
human_bytes(bytes),
|
human_bytes(bytes as f64),
|
||||||
human_bytes(total)
|
human_bytes(total as f64),
|
||||||
|
rate_display,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
print!("{} transferred\r", human_bytes(bytes));
|
print!("\x1b[2K{} transferred\r", human_bytes(bytes as f64));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::io::stdout().flush().ok();
|
std::io::stdout().flush().ok();
|
||||||
}
|
}
|
||||||
BackupEvent::DatasetCompleted(name) => {
|
BackupEvent::SendComplete(bytes) => {
|
||||||
println!("Completed backup of dataset: {}", name);
|
println!(
|
||||||
|
"\x1b[2KSend completed at {}, awaiting receive completion",
|
||||||
|
human_bytes(bytes as f64)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
BackupEvent::DryrunCompleted(name) => {
|
BackupEvent::DatasetComplete(name) => {
|
||||||
println!("Completed dry run backup of dataset: {}", name);
|
println!("\x1b[2KCompleted backup of dataset: {}", name);
|
||||||
|
}
|
||||||
|
BackupEvent::DryrunComplete(name) => {
|
||||||
|
println!("\x1b[2KCompleted dry run backup of dataset: {}", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -82,6 +111,7 @@ impl Progressor {
|
|||||||
Self {
|
Self {
|
||||||
receiver,
|
receiver,
|
||||||
estimated_size: 0,
|
estimated_size: 0,
|
||||||
|
rate_tracker: RateTracker::new(100),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user