use std::io::{Error, ErrorKind, Write};
use std::process::{Command, Stdio};
use crate::connector::Connector;
use crate::destination::Destination;
use crate::types::Bytes;
use crate::utils::{binary_exists, wait_for_command};
pub struct Postgres<'a> {
host: &'a str,
port: u16,
database: &'a str,
username: &'a str,
password: &'a str,
wipe_database: bool,
}
impl<'a> Postgres<'a> {
pub fn new(
host: &'a str,
port: u16,
database: &'a str,
username: &'a str,
password: &'a str,
wipe_database: bool,
) -> Self {
Postgres {
host,
port,
database,
username,
password,
wipe_database,
}
}
}
impl<'a> Connector for Postgres<'a> {
fn init(&mut self) -> Result<(), Error> {
let _ = binary_exists("psql")?;
if self.wipe_database {
let s_port = self.port.to_string();
let wipe_db_query = wipe_database_query(self.username);
let exit_status = Command::new("psql")
.env("PGPASSWORD", self.password)
.args([
"-h",
self.host,
"-p",
s_port.as_str(),
"-d",
self.database,
"-U",
self.username,
"-c",
wipe_db_query.as_str(),
])
.stdout(Stdio::null())
.spawn()?
.wait()?;
if !exit_status.success() {
return Err(Error::new(
ErrorKind::Other,
format!("command error: {:?}", exit_status.to_string()),
));
}
}
Ok(())
}
}
impl<'a> Destination for Postgres<'a> {
fn write(&self, data: Bytes) -> Result<(), Error> {
let s_port = self.port.to_string();
let mut process = Command::new("psql")
.env("PGPASSWORD", self.password)
.args([
"-h",
self.host,
"-p",
s_port.as_str(),
"-d",
self.database,
"-U",
self.username,
])
.stdin(Stdio::piped())
.stdout(Stdio::null())
.spawn()?;
let _ = process.stdin.take().unwrap().write_all(data.as_slice());
wait_for_command(&mut process)
}
}
fn wipe_database_query(username: &str) -> String {
format!(
"\
DROP SCHEMA public CASCADE; \
CREATE SCHEMA public; \
GRANT ALL ON SCHEMA public TO \"{}\"; \
GRANT ALL ON SCHEMA public TO public;\
",
username
)
}
#[cfg(test)]
mod tests {
use crate::connector::Connector;
use crate::destination::postgres::Postgres;
use crate::destination::Destination;
fn get_postgres() -> Postgres<'static> {
Postgres::new("localhost", 5453, "root", "root", "password", true)
}
fn get_invalid_postgres() -> Postgres<'static> {
Postgres::new("localhost", 5453, "root", "root", "wrongpassword", true)
}
#[test]
fn connect() {
let mut p = get_postgres();
let _ = p.init().expect("can't init postgres");
assert!(p.write(b"SELECT 1".to_vec()).is_ok());
let mut p = get_invalid_postgres();
assert!(p.init().is_err());
assert!(p.write(b"SELECT 1".to_vec()).is_err());
}
#[test]
fn test_inserts() {}
}