use std::io::{Error, ErrorKind};
use std::sync::mpsc;
use std::thread;
use crate::datastore::Datastore;
use crate::source::SourceOptions;
use crate::tasks::{MaxBytes, Message, Task, TransferredBytes};
use crate::types::{to_bytes, Queries};
use crate::Source;
type DataMessage = (u16, Queries);
/// FullDumpTask is a wrapping struct to execute the synchronization between a *Source* and a *Datastore*
pub struct FullDumpTask<'a, S>
where
S: Source,
{
source: S,
datastore: Box<dyn Datastore>,
options: SourceOptions<'a>,
}
impl<'a, S> FullDumpTask<'a, S>
where
S: Source,
{
pub fn new(source: S, datastore: Box<dyn Datastore>, options: SourceOptions<'a>) -> Self {
FullDumpTask {
source,
datastore,
options,
}
}
}
impl<'a, S> Task for FullDumpTask<'a, S>
where
S: Source,
{
fn run<F: FnMut(TransferredBytes, MaxBytes)>(
mut self,
mut progress_callback: F,
) -> Result<(), Error> {
// initialize the source
let _ = self.source.init()?;
let (tx, rx) = mpsc::sync_channel::<Message<DataMessage>>(1);
let datastore = self.datastore;
let join_handle = thread::spawn(move || -> Result<(), Error> {
// managing Datastore (S3) upload here
let datastore = datastore;
loop {
let result = match rx.recv() {
Ok(Message::Data((chunk_part, queries))) => Ok((chunk_part, queries)),
Ok(Message::EOF) => break,
Err(err) => Err(Error::new(ErrorKind::Other, format!("{}", err))),
};
if let Ok((chunk_part, queries)) = result {
let _ = match datastore.write(chunk_part, to_bytes(queries)) {
Ok(_) => {}
Err(err) => return Err(Error::new(ErrorKind::Other, format!("{}", err))),
};
}
}
Ok(())
});
// buffer of 100MB in memory to use and re-use to upload data into datastore
let buffer_size = 100 * 1024 * 1024;
let mut queries = vec![];
let mut consumed_buffer_size = 0usize;
let mut total_transferred_bytes = 0usize;
let mut chunk_part = 0u16;
// init progress
progress_callback(
total_transferred_bytes,
buffer_size * (chunk_part as usize + 1),
);
let _ = self.source.read(self.options, |_original_query, query| {
if consumed_buffer_size + query.data().len() > buffer_size {
chunk_part += 1;
consumed_buffer_size = 0;
// TODO .clone() - look if we do not consume more mem
let message = Message::Data((chunk_part, queries.clone()));
let _ = tx.send(message); // FIXME catch SendError?
let _ = queries.clear();
}
consumed_buffer_size += query.data().len();
total_transferred_bytes += query.data().len();
progress_callback(
total_transferred_bytes,
buffer_size * (chunk_part as usize + 1),
);
queries.push(query);
})?;
progress_callback(total_transferred_bytes, total_transferred_bytes);
chunk_part += 1;
let _ = tx.send(Message::Data((chunk_part, queries)));
let _ = tx.send(Message::EOF);
// wait for end of upload execution
join_handle.join().unwrap()?;
Ok(())
}
}