Skip to content

Commit

Permalink
cleanup and fixed many clippy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
olexiyb committed Jun 14, 2024
1 parent cb29fc7 commit 4891d88
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 423 deletions.
10 changes: 5 additions & 5 deletions src/command_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
/// process error type
fn error_type(&self) -> E;
/// wrap error
fn wrap_error<F: Error + Sync + Send + 'static>(&self, error: F, message: Option<String>) -> E;
fn wrap_error<F: Error + Sync + Send + 'static>(&self, error: F, message: String) -> E;
}

///
Expand Down Expand Up @@ -127,7 +127,7 @@ where
}

/// Handle process output
async fn handle_output<R: AsyncRead + Unpin>(data: R, sender: Sender<LogOutputData>) -> () {
async fn handle_output<R: AsyncRead + Unpin>(data: R, sender: Sender<LogOutputData>) {
let mut lines = BufReader::new(data).lines();
while let Some(line) = lines.next_line().await.expect("error handling output") {
let io_data = LogOutputData {
Expand All @@ -142,7 +142,7 @@ where
}

/// Log process output
async fn log_output(mut receiver: Receiver<LogOutputData>) -> () {
async fn log_output(mut receiver: Receiver<LogOutputData>) {
while let Some(data) = receiver.recv().await {
match data.log_type {
LogType::Info => {
Expand All @@ -161,7 +161,7 @@ where
.process
.wait()
.await
.map_err(|e| self.process_type.wrap_error(e, None))?;
.map_err(|e| self.process_type.wrap_error(e, "failed to run process".to_string()))?;
if exit_status.success() {
Ok(self.process_type.status_exit())
} else {
Expand Down Expand Up @@ -227,7 +227,7 @@ where
.await
.map_err(|e| {
self.process_type
.wrap_error(e, Some(String::from("timed out")))
.wrap_error(e, String::from("timed out"))
})?,
}
}
Expand Down
178 changes: 46 additions & 132 deletions src/pg_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@

use std::cell::Cell;
use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::TryFutureExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;

use crate::pg_enums::{OperationSystem, PgAcquisitionStatus};
use crate::pg_errors::{PgEmbedError, PgEmbedErrorType};
use crate::pg_errors::PgEmbedError;
use crate::pg_fetch::PgFetchSettings;
use crate::pg_types::{PgCommandSync, PgResult};
use crate::pg_unpack;
Expand All @@ -28,8 +27,8 @@ lazy_static! {
Arc::new(Mutex::new(HashMap::with_capacity(5)));
}

const PG_EMBED_CACHE_DIR_NAME: &'static str = "pg-embed";
const PG_VERSION_FILE_NAME: &'static str = "PG_VERSION";
const PG_EMBED_CACHE_DIR_NAME: &str = "pg-embed";
const PG_VERSION_FILE_NAME: &str = "PG_VERSION";

///
/// Access to pg_ctl, initdb, database directory and cache directory
Expand Down Expand Up @@ -67,18 +66,13 @@ impl PgAccess {
) -> Result<Self, PgEmbedError> {
let cache_dir = match cache_dir {
Some(d) => {
std::fs::create_dir_all(&d)
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::DirCreationError,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::create_dir_all(d).map_err(|e| PgEmbedError::DirCreationError { dir: d.clone(), e })?;
d.clone()
},
None => Self::create_cache_dir_structure(&fetch_settings)?,
None => Self::create_cache_dir_structure(fetch_settings)?,
};

Self::create_db_dir_structure(database_dir).await?;
Self::create_db_dir_structure(database_dir)?;
// pg_ctl executable
let pg_ctl = cache_dir.clone().join("bin").join("pg_ctl");
// initdb executable
Expand Down Expand Up @@ -113,45 +107,31 @@ impl PgAccess {
/// Returns PathBuf(cache_directory) on success, an error otherwise
///
fn create_cache_dir_structure(fetch_settings: &PgFetchSettings) -> PgResult<PathBuf> {
let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
error_type: PgEmbedErrorType::InvalidPgUrl,
source: None,
message: None,
})?;
let cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError::NoSystemCacheDirectory)?;
let os_string = match fetch_settings.operating_system {
OperationSystem::Darwin | OperationSystem::Windows | OperationSystem::Linux => {
fetch_settings.operating_system.to_string()
}
OperationSystem::AlpineLinux => {
format!("arch_{}", fetch_settings.operating_system.to_string())
format!("arch_{}", fetch_settings.operating_system)
}
};
let pg_path = format!(
"{}/{}/{}/{}",
PG_EMBED_CACHE_DIR_NAME,
os_string,
fetch_settings.architecture.to_string(),
fetch_settings.architecture,
fetch_settings.version.0
);
let mut cache_pg_embed = cache_dir.clone();
cache_pg_embed.push(pg_path);
std::fs::create_dir_all(&cache_pg_embed)
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::DirCreationError,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::create_dir_all(&cache_pg_embed).map_err(|e|PgEmbedError::DirCreationError { dir: cache_pg_embed.clone(), e })?;
Ok(cache_pg_embed)
}

async fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
tokio::fs::create_dir_all(db_dir)
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::DirCreationError,
source: Some(Box::new(e)),
message: None,
})
.await
fn create_db_dir_structure(db_dir: &PathBuf) -> PgResult<()> {
std::fs::create_dir_all(db_dir).map_err(|e| PgEmbedError::DirCreationError { dir: db_dir.clone(), e })?;
Ok(())
}

///
Expand All @@ -160,26 +140,21 @@ impl PgAccess {
pub async fn maybe_acquire_postgres(&self) -> PgResult<()> {
let mut lock = ACQUIRED_PG_BINS.lock().await;

if self.pg_executables_cached().await? {
if self.pg_executables_cached()? {
return Ok(());
}

lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::InProgress);
let pg_bin_data = self.fetch_settings.fetch_postgres().await?;
self.write_pg_zip(&pg_bin_data).await?;
self.write_pg_zip(&pg_bin_data)?;
log::debug!(
"Unpacking postgres binaries {} {}",
self.zip_file_path.display(),
self.cache_dir.display()
);
pg_unpack::unpack_postgres(&self.zip_file_path, &self.cache_dir).await?;
tokio::fs::remove_file(&self.zip_file_path)
.await
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgCleanUpFailure,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::remove_file(&self.zip_file_path)
.map_err(|e| PgEmbedError::PgCleanUpFailure { path: self.zip_file_path.clone(), e })?;

lock.insert(self.cache_dir.clone(), PgAcquisitionStatus::Finished);
Ok(())
Expand All @@ -188,37 +163,33 @@ impl PgAccess {
///
/// Check if postgresql executables are already cached
///
pub async fn pg_executables_cached(&self) -> PgResult<bool> {
Self::path_exists(self.init_db_exe.as_path()).await
pub fn pg_executables_cached(&self) -> PgResult<bool> {
Self::path_exists(self.init_db_exe.as_path())
}

///
/// Check if database files exist
///
pub async fn db_files_exist(&self) -> PgResult<bool> {
Ok(self.pg_executables_cached().await?
&& Self::path_exists(self.pg_version_file.as_path()).await?)
Ok(self.pg_executables_cached()?
&& Self::path_exists(self.pg_version_file.as_path())?)
}

///
/// Check if database version file exists
///
pub async fn pg_version_file_exists(db_dir: &PathBuf) -> PgResult<bool> {
let mut pg_version_file = db_dir.clone();
pub async fn pg_version_file_exists(db_dir: &Path) -> PgResult<bool> {
let mut pg_version_file = db_dir.to_path_buf();
pg_version_file.push(PG_VERSION_FILE_NAME);
let file_exists = if let Ok(_) = tokio::fs::File::open(pg_version_file.as_path()).await {
true
} else {
false
};
let file_exists = std::fs::File::open(pg_version_file.as_path()).is_ok();
Ok(file_exists)
}

///
/// Check if file path exists
///
async fn path_exists(file: &Path) -> PgResult<bool> {
if let Ok(_) = tokio::fs::File::open(file).await {
fn path_exists(file: &Path) -> PgResult<bool> {
if std::fs::File::open(file).is_ok() {
Ok(true)
} else {
Ok(false)
Expand All @@ -240,28 +211,9 @@ impl PgAccess {
///
/// Write pg binaries zip to postgresql cache directory
///
async fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
let mut file: tokio::fs::File = tokio::fs::File::create(&self.zip_file_path.as_path())
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::WriteFileError,
source: Some(Box::new(e)),
message: None,
})
.await?;
file.write_all(&bytes)
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::WriteFileError,
source: Some(Box::new(e)),
message: None,
})
.await?;
file.sync_data()
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::WriteFileError,
source: Some(Box::new(e)),
message: None,
})
.await?;
fn write_pg_zip(&self, bytes: &[u8]) -> PgResult<()> {
let mut file = std::fs::File::create(self.zip_file_path.as_path()).map_err(|e| PgEmbedError::WriteFileError { path: self.zip_file_path.clone(), e })?;
file.write(bytes).map_err(|e| PgEmbedError::WriteFileError { path: self.zip_file_path.clone(), e })?;
Ok(())
}

Expand All @@ -272,16 +224,8 @@ impl PgAccess {
///
pub fn clean(&self) -> PgResult<()> {
// not using tokio::fs async methods because clean() is called on drop
std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgCleanUpFailure,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgCleanUpFailure,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::remove_dir_all(self.database_dir.as_path()).map_err(|e| PgEmbedError::PgCleanUpFailure { path: self.database_dir.clone(), e})?;
std::fs::remove_file(self.pw_file_path.as_path()).map_err(|e| PgEmbedError::PgCleanUpFailure{ path: self.pw_file_path.clone(), e})?;
Ok(())
}

Expand All @@ -291,76 +235,46 @@ impl PgAccess {
/// Remove all cached postgresql executables
///
pub async fn purge() -> PgResult<()> {
let mut cache_dir = dirs::cache_dir().ok_or_else(|| PgEmbedError {
error_type: PgEmbedErrorType::ReadFileError,
source: None,
message: Some(String::from("cache dir error")),
})?;
cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
let _ = tokio::fs::remove_dir_all(cache_dir.as_path())
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgPurgeFailure,
source: Some(Box::new(e)),
message: None,
})
.await;
if let Some(cache_dir) = &mut dirs::cache_dir() {
cache_dir.push(PG_EMBED_CACHE_DIR_NAME);
std::fs::remove_dir_all(&cache_dir).map_err(|e| PgEmbedError::PgCleanUpFailure { path: cache_dir.clone(), e })?;
}
Ok(())
}

///
/// Clean up database directory and password file
///
pub async fn clean_up(database_dir: PathBuf, pw_file: PathBuf) -> PgResult<()> {
tokio::fs::remove_dir_all(database_dir.as_path())
.await
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgCleanUpFailure,
source: Some(Box::new(e)),
message: None,
})?;
std::fs::remove_dir_all(&database_dir)
.map_err(|e| PgEmbedError::PgCleanUpFailure { path: database_dir, e })?;

tokio::fs::remove_file(pw_file.as_path())
.await
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::PgCleanUpFailure,
source: Some(Box::new(e)),
message: None,
})
std::fs::remove_file(&pw_file).map_err(|e| PgEmbedError::PgCleanUpFailure { path: pw_file, e })
}

///
/// Create a database password file
///
/// Returns `Ok(())` on success, otherwise returns an error.
///
pub async fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
let mut file: tokio::fs::File = tokio::fs::File::create(self.pw_file_path.as_path())
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::WriteFileError,
source: Some(Box::new(e)),
message: None,
})
.await?;
let _ = file
pub fn create_password_file(&self, password: &[u8]) -> PgResult<()> {
let mut file = std::fs::File::create(self.pw_file_path.as_path())
.map_err(|e | PgEmbedError::WriteFileError { path: self.pw_file_path.clone(), e})?;
file
.write(password)
.map_err(|e| PgEmbedError {
error_type: PgEmbedErrorType::WriteFileError,
source: Some(Box::new(e)),
message: None,
})
.await?;
.map_err(|e| PgEmbedError::WriteFileError { path: self.pw_file_path.clone(), e})?;
Ok(())
}

///
/// Create synchronous pg_ctl stop command
///
pub fn stop_db_command_sync(&self, database_dir: &PathBuf) -> PgCommandSync {
pub fn stop_db_command_sync(&self, database_dir: &Path) -> PgCommandSync {
let pg_ctl_executable = self.pg_ctl_exe.to_str().unwrap();
let mut command = Box::new(Cell::new(std::process::Command::new(pg_ctl_executable)));
command
.get_mut()
.args(&["stop", "-w", "-D", database_dir.to_str().unwrap()]);
.args(["stop", "-w", "-D", database_dir.to_str().unwrap()]);
command
}
}
Loading

0 comments on commit 4891d88

Please sign in to comment.