#[macro_use]
extern crate prettytable;
use std::fs::File;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::thread::sleep;
use std::time::Duration;
use std::{env, thread};
use clap::Parser;
use indicatif::{ProgressBar, ProgressStyle};
use migration::{migrations, Migrator};
use utils::get_replibyte_version;
use crate::cli::{DumpCommand, RestoreCommand, SubCommand, TransformerCommand, CLI, SourceCommand};
use crate::config::{Config, DatabaseSubsetConfig, DatastoreConfig};
use crate::datastore::local_disk::LocalDisk;
use crate::datastore::s3::S3;
use crate::datastore::Datastore;
use crate::source::{Source, SourceOptions};
use crate::tasks::{MaxBytes, TransferredBytes};
use crate::telemetry::{ClientOptions, TelemetryClient, TELEMETRY_TOKEN};
use crate::utils::epoch_millis;
mod cli;
mod commands;
mod config;
mod connector;
mod datastore;
mod destination;
mod migration;
mod runtime;
mod source;
mod tasks;
mod telemetry;
mod transformer;
mod types;
mod utils;
fn show_progress_bar(rx_pb: Receiver<(TransferredBytes, MaxBytes)>) {
let pb = ProgressBar::new(0);
pb.set_style(ProgressStyle::default_spinner());
let mut style_is_progress_bar = false;
let mut _max_bytes = 0usize;
let mut last_transferred_bytes = 0usize;
loop {
let (transferred_bytes, max_bytes) = match rx_pb.try_recv() {
Ok(msg) => msg,
Err(_) => (last_transferred_bytes, _max_bytes),
};
if _max_bytes == 0 && style_is_progress_bar {
pb.set_style(ProgressStyle::default_spinner());
style_is_progress_bar = false;
} else if _max_bytes > 0 && !style_is_progress_bar {
pb.set_style(ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.green/blue}] {bytes}/{total_bytes} ({eta})")
.progress_chars("#>-"));
style_is_progress_bar = true;
}
if max_bytes != _max_bytes {
pb.set_length(max_bytes as u64);
_max_bytes = max_bytes;
}
last_transferred_bytes = transferred_bytes;
pb.set_position(transferred_bytes as u64);
sleep(Duration::from_micros(50));
}
}
fn main() {
let start_exec_time = utils::epoch_millis();
env_logger::init();
let env_args = env::args().collect::<Vec<String>>();
let args = CLI::parse();
let file = File::open(args.config).expect("missing config file");
let config: Config = serde_yaml::from_reader(file).expect("bad config file format");
let sub_commands: &SubCommand = &args.sub_commands;
let telemetry_client = match args.no_telemetry {
true => None,
false => Some(TelemetryClient::new(ClientOptions::from(TELEMETRY_TOKEN))),
};
let telemetry_config = config.clone();
if let Some(telemetry_client) = &telemetry_client {
let _ = telemetry_client.capture_command(&telemetry_config, sub_commands, &env_args, None);
}
let mut exit_code = 0;
if let Err(err) = run(config, &sub_commands) {
eprintln!("{}", err);
exit_code = 1;
}
if let Some(telemetry_client) = &telemetry_client {
let _ = telemetry_client.capture_command(
&telemetry_config,
sub_commands,
&env_args,
Some(epoch_millis() - start_exec_time),
);
}
if exit_code != 0 {
std::process::exit(exit_code);
}
}
fn run(config: Config, sub_commands: &SubCommand) -> anyhow::Result<()> {
let mut datastore: Box<dyn Datastore> = match &config.datastore {
DatastoreConfig::AWS(config) => Box::new(S3::aws(
config.bucket()?,
config.region()?,
config.profile()?,
config.credentials()?,
config.endpoint()?,
)?),
DatastoreConfig::GCP(config) => Box::new(S3::gcp(
config.bucket()?,
config.region()?,
config.access_key()?,
config.secret()?,
config.endpoint()?,
)?),
DatastoreConfig::LocalDisk(config) => Box::new(LocalDisk::new(config.dir()?)),
};
let migrator = Migrator::new(get_replibyte_version(), &datastore, migrations());
let _ = migrator.migrate()?;
let _ = datastore.init()?;
let (tx_pb, rx_pb) = mpsc::sync_channel::<(TransferredBytes, MaxBytes)>(1000);
match sub_commands {
SubCommand::Dump(dump_cmd) => match dump_cmd {
DumpCommand::Restore(cmd) => match cmd {
RestoreCommand::Local(args) => if args.output {},
RestoreCommand::Remote(args) => if args.output {},
},
_ => {
let _ = thread::spawn(move || show_progress_bar(rx_pb));
}
},
_ => {
let _ = thread::spawn(move || show_progress_bar(rx_pb));
}
};
let progress_callback = |bytes: TransferredBytes, max_bytes: MaxBytes| {
let _ = tx_pb.send((bytes, max_bytes));
};
match sub_commands {
SubCommand::Dump(cmd) => match cmd {
DumpCommand::List => {
let _ = commands::dump::list(&mut datastore)?;
Ok(())
}
DumpCommand::Create(args) => {
if let Some(name) = &args.name {
datastore.set_dump_name(name.to_string());
}
commands::dump::run(args, datastore, config, progress_callback)
}
DumpCommand::Delete(args) => commands::dump::delete(datastore, args),
DumpCommand::Restore(restore_cmd) => match restore_cmd {
RestoreCommand::Local(args) => {
commands::dump::restore_local(args, datastore, config, progress_callback)
}
RestoreCommand::Remote(args) => {
commands::dump::restore_remote(args, datastore, config, progress_callback)
}
},
},
SubCommand::Source(cmd) => match cmd {
SourceCommand::Schema => {
commands::source::schema(config)
}
},
SubCommand::Transformer(cmd) => match cmd {
TransformerCommand::List => {
let _ = commands::transformer::list();
Ok(())
}
},
}
}