Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ObjectStore::append #5016

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util", "fs"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }

[target.'cfg(target_family="unix")'.dev-dependencies]
Expand Down
31 changes: 1 addition & 30 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@
//!
//! This provides some compelling advantages:
//!
//! * Except where explicitly stated otherwise, operations are atomic, and readers
//! cannot observe partial and/or failed writes
//! * All operations are atomic, and readers cannot observe partial and/or failed writes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
Expand Down Expand Up @@ -559,30 +558,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// vary by object store.
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>;

/// Returns an [`AsyncWrite`] that can be used to append to the object at `location`
///
/// A new object will be created if it doesn't already exist, otherwise it will be
/// opened, with subsequent writes appended to the end.
///
/// This operation cannot be supported by all stores, most use-cases should prefer
/// [`ObjectStore::put`] and [`ObjectStore::put_multipart`] for better portability
/// and stronger guarantees
///
/// This API is not guaranteed to be atomic, in particular
///
/// * On error, `location` may contain partial data
/// * Concurrent calls to [`ObjectStore::list`] may return partially written objects
/// * Concurrent calls to [`ObjectStore::get`] may return partially written data
/// * Concurrent calls to [`ObjectStore::put`] may result in data loss / corruption
/// * Concurrent calls to [`ObjectStore::append`] may result in data loss / corruption
///
/// Additionally some stores, such as Azure, may only support appending to objects created
/// with [`ObjectStore::append`], and not with [`ObjectStore::put`], [`ObjectStore::copy`], or
/// [`ObjectStore::put_multipart`]
async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Err(Error::NotImplemented)
}

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
Expand Down Expand Up @@ -779,10 +754,6 @@ macro_rules! as_ref_impl {
self.as_ref().abort_multipart(location, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
self.as_ref().append(location).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
Expand Down
7 changes: 0 additions & 7 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.abort_multipart(location, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let write = self.inner.append(location).await?;
Ok(Box::new(PermitWrapper::new(write, permit)))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let r = self.inner.get(location).await?;
Expand Down
126 changes: 0 additions & 126 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,45 +350,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
// Get the path to the file from the configuration.
let path = self.config.path_to_filesystem(location)?;
loop {
// Create new `OpenOptions`.
let mut options = tokio::fs::OpenOptions::new();

// Attempt to open the file with the given options.
match options
.truncate(false)
.append(true)
.create(true)
.open(&path)
.await
{
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(source) if source.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
.await
// If creating the directory fails, return a `UnableToCreateDirSnafu` error.
.context(UnableToCreateDirSnafu { path: parent })?;
// Try again to open the file.
continue;
}
// If any other error occurs, return a `UnableToOpenFile` error.
Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()),
}
}
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
Expand Down Expand Up @@ -1449,97 +1410,10 @@ mod tests {
mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use bytes::Bytes;
use std::time::Duration;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn creates_dir_if_not_present_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("nested/file/test_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();

let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();

writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn unknown_length_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();
writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn multiple_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = vec![
Bytes::from("arbitrary"),
Bytes::from("data"),
Bytes::from("gnz"),
];

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
writer.flush().await.unwrap();

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn test_cleanup_intermediate_files() {
let root = TempDir::new().unwrap();
Expand Down
99 changes: 0 additions & 99 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,6 @@ impl ObjectStore for InMemory {
Ok(())
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Ok(Box::new(InMemoryAppend {
location: location.clone(),
data: Vec::<u8>::new(),
storage: SharedStorage::clone(&self.storage),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();
Expand Down Expand Up @@ -443,53 +435,8 @@ impl AsyncWrite for InMemoryUpload {
}
}

struct InMemoryAppend {
location: Path,
data: Vec<u8>,
storage: Arc<RwLock<Storage>>,
}

impl AsyncWrite for InMemoryAppend {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.data.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}

fn poll_flush(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
let storage = Arc::clone(&self.storage);

let mut writer = storage.write();

if let Some(entry) = writer.map.remove(&self.location) {
let buf = std::mem::take(&mut self.data);
let concat = Bytes::from_iter(entry.data.into_iter().chain(buf));
writer.insert(&self.location, concat);
} else {
let data = Bytes::from(std::mem::take(&mut self.data));
writer.insert(&self.location, data);
};
Poll::Ready(Ok(()))
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.poll_flush(cx)
}
}

#[cfg(test)]
mod tests {
use tokio::io::AsyncWriteExt;

use super::*;

use crate::tests::*;
Expand Down Expand Up @@ -577,50 +524,4 @@ mod tests {
panic!("unexpected error type: {err:?}");
}
}

#[tokio::test]
async fn test_append_new() {
let in_memory = InMemory::new();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();

let mut writer = in_memory.append(&location).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.flush().await.unwrap();

let read_data = in_memory
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn test_append_existing() {
let in_memory = InMemory::new();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary");
let data_appended = Bytes::from(" data");
let expected_data = Bytes::from("arbitrary data");

let mut writer = in_memory.append(&location).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.flush().await.unwrap();

writer.write_all(&data_appended).await.unwrap();
writer.flush().await.unwrap();

let read_data = in_memory
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
}
6 changes: 0 additions & 6 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
let full_path = self.full_path(location);
self.inner.abort_multipart(&full_path, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let full_path = self.full_path(location);
self.inner.append(&full_path).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
Expand Down
4 changes: 0 additions & 4 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Err(super::Error::NotImplemented)
}

async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Err(super::Error::NotImplemented)
}

async fn get(&self, location: &Path) -> Result<GetResult> {
sleep(self.config().wait_get_per_call).await;

Expand Down
Loading