Skip to content

Commit

Permalink
feat(java): add WriteOptions for write methods
Browse files Browse the repository at this point in the history
  • Loading branch information
geruh committed Feb 23, 2025
1 parent df47505 commit 7032caf
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 15 deletions.
50 changes: 44 additions & 6 deletions bindings/java/src/async_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::future::Future;
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -30,10 +31,11 @@ use jni::sys::jsize;
use jni::JNIEnv;
use opendal::layers::BlockingLayer;
use opendal::raw::PresignedRequest;
use opendal::Operator;
use opendal::{Metadata, Operator};
use opendal::operator_futures::FutureWrite;
use opendal::Scheme;

use crate::convert::jmap_to_hashmap;
use crate::convert::{get_optional_map_from_object, get_optional_string_from_object, jmap_to_hashmap};
use crate::convert::jstring_to_string;
use crate::executor::executor_or_default;
use crate::executor::get_current_env;
Expand Down Expand Up @@ -110,8 +112,9 @@ pub unsafe extern "system" fn Java_org_apache_opendal_AsyncOperator_write(
executor: *const Executor,
path: JString,
content: JByteArray,
write_options: JObject
) -> jlong {
intern_write(&mut env, op, executor, path, content).unwrap_or_else(|e| {
intern_write(&mut env, op, executor, path, content, write_options).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
Expand All @@ -123,23 +126,58 @@ fn intern_write(
executor: *const Executor,
path: JString,
content: JByteArray,
options: JObject,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
let content_type = get_optional_string_from_object(env, &options, "getContentType")?;
let content_disposition = get_optional_string_from_object(env, &options, "getContentDisposition")?;
let content_encoding = get_optional_string_from_object(env, &options, "getContentEncoding")?;
let cache_control = get_optional_string_from_object(env, &options, "getCacheControl")?;
let if_match = get_optional_string_from_object(env, &options, "getIfMatch")?;
let if_none_match = get_optional_string_from_object(env, &options, "getIfNoneMatch")?;
let append = env.call_method(&options, "isAppend", "()Z", &[])?.z()?;
let if_not_exists = env.call_method(&options, "isIfNotExists", "()Z", &[])?.z()?;
let user_metadata = get_optional_map_from_object(env, &options, "getUserMetadata");

let mut write_op = op.write_with(&path, content);
if let Some(ct) = content_type {
write_op = write_op.content_type(&ct);
}
if let Some(cd) = content_disposition {
write_op = write_op.content_disposition(&cd);
}
if let Some(ce) = content_encoding {
write_op = write_op.content_encoding(&ce);
}
if let Some(cc) = cache_control {
write_op = write_op.cache_control(&cc);
}
if let Some(im) = if_match {
write_op = write_op.if_match(&im);
}
if let Some(inm) = if_none_match {
write_op = write_op.if_none_match(&inm);
}
if let Ok(Some(um)) = user_metadata {
write_op = write_op.user_metadata(um);
}
write_op = write_op.if_not_exists(if_not_exists);
write_op = write_op.append(append);

executor_or_default(env, executor)?.spawn(async move {
let result = do_write(op, path, content).await;
let result = do_write(write_op).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});

Ok(id)
}

async fn do_write(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write(&path, content).await.map(|_| ())?)
async fn do_write(writer: FutureWrite<impl Future<Output =opendal::Result<Metadata>>>) -> Result<()> {
Ok(writer.await.map(|_| ())?)
}

/// # Safety
Expand Down
18 changes: 18 additions & 0 deletions bindings/java/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ pub(crate) fn string_to_jstring<'a>(
)
}

pub(crate) fn get_optional_string_from_object<'a>(env: &mut JNIEnv<'a>, obj: &JObject, method: &str) -> crate::Result<Option<String>> {
let result = env.call_method(obj, method, "()Ljava/lang/String;", &[])?.l()?;
if result.is_null() {
Ok(None)
} else {
Ok(Some(jstring_to_string(env, &JString::from(result))?))
}
}

pub(crate) fn get_optional_map_from_object<'a>(env: &mut JNIEnv<'a>, obj: &JObject, method: &str) -> crate::Result<Option<HashMap<String, String>>> {
let result = env.call_method(obj, method, "()Ljava/util/Map;", &[])?.l()?;
if result.is_null() {
Ok(None)
} else {
Ok(Some(jmap_to_hashmap(env, &result)?))
}
}

/// # Safety
///
/// The caller must guarantee that the Object passed in is an instance
Expand Down
6 changes: 5 additions & 1 deletion bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn make_operator_info<'a>(env: &mut JNIEnv<'a>, info: OperatorInfo) -> Result<JO
fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<'a>> {
let capability = env.new_object(
"org/apache/opendal/Capability",
"(ZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V",
"(ZZZZZZZZZZZZZZZZZZZJJZZZZZZZZZZZZZZ)V",
&[
JValue::Bool(cap.stat as jboolean),
JValue::Bool(cap.stat_with_if_match as jboolean),
Expand All @@ -111,6 +111,10 @@ fn make_capability<'a>(env: &mut JNIEnv<'a>, cap: Capability) -> Result<JObject<
JValue::Bool(cap.write_with_content_type as jboolean),
JValue::Bool(cap.write_with_content_disposition as jboolean),
JValue::Bool(cap.write_with_cache_control as jboolean),
JValue::Bool(cap.write_with_if_match as jboolean),
JValue::Bool(cap.write_with_if_none_match as jboolean),
JValue::Bool(cap.write_with_if_not_exists as jboolean),
JValue::Bool(cap.write_with_user_metadata as jboolean),
JValue::Long(convert::usize_to_jlong(cap.write_multi_max_size)),
JValue::Long(convert::usize_to_jlong(cap.write_multi_min_size)),
JValue::Bool(cap.create_dir as jboolean),
Expand Down
18 changes: 15 additions & 3 deletions bindings/java/src/main/java/org/apache/opendal/AsyncOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,22 @@ public Operator blocking() {
}

public CompletableFuture<Void> write(String path, String content) {
return write(path, content.getBytes(StandardCharsets.UTF_8));
return write(
path,
content.getBytes(StandardCharsets.UTF_8),
WriteOptions.builder().build());
}

public CompletableFuture<Void> write(String path, byte[] content) {
final long requestId = write(nativeHandle, executorHandle, path, content);
return write(path, content, WriteOptions.builder().build());
}

public CompletableFuture<Void> write(String path, String content, WriteOptions options) {
return write(path, content.getBytes(StandardCharsets.UTF_8), options);
}

public CompletableFuture<Void> write(String path, byte[] content, WriteOptions options) {
final long requestId = write(nativeHandle, executorHandle, path, content, options);
return AsyncRegistry.take(requestId);
}

Expand Down Expand Up @@ -272,7 +283,8 @@ public CompletableFuture<List<Entry>> list(String path) {

private static native long read(long nativeHandle, long executorHandle, String path);

private static native long write(long nativeHandle, long executorHandle, String path, byte[] content);
private static native long write(
long nativeHandle, long executorHandle, String path, byte[] content, WriteOptions options);

private static native long append(long nativeHandle, long executorHandle, String path, byte[] content);

Expand Down
29 changes: 29 additions & 0 deletions bindings/java/src/main/java/org/apache/opendal/Capability.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ public class Capability {
*/
public final boolean writeWithCacheControl;

/**
* If operator supports write with if match.
*/
public final boolean writeWithIfMatch;

/**
* If operator supports write with if none match.
*
*/
public final boolean writeWithIfNoneMatch;

/**
* If operator supports write with if not exists.
*/
public final boolean writeWithIfNotExists;

/**
* If operator supports write with user metadata.
*/
public final boolean writeWithUserMetadata;

/**
* write_multi_max_size is the max size that services support in write_multi.
* For example, AWS S3 supports 5GiB as max in write_multi.
Expand Down Expand Up @@ -196,6 +217,10 @@ public Capability(
boolean writeWithContentType,
boolean writeWithContentDisposition,
boolean writeWithCacheControl,
boolean writeWithIfMatch,
boolean writeWithIfNoneMatch,
boolean writeWithIfNotExists,
boolean writeWithUserMetadata,
long writeMultiMaxSize,
long writeMultiMinSize,
boolean createDir,
Expand Down Expand Up @@ -227,6 +252,10 @@ public Capability(
this.writeWithContentType = writeWithContentType;
this.writeWithContentDisposition = writeWithContentDisposition;
this.writeWithCacheControl = writeWithCacheControl;
this.writeWithIfMatch = writeWithIfMatch;
this.writeWithIfNoneMatch = writeWithIfNoneMatch;
this.writeWithIfNotExists = writeWithIfNotExists;
this.writeWithUserMetadata = writeWithUserMetadata;
this.writeMultiMaxSize = writeMultiMaxSize;
this.writeMultiMinSize = writeMultiMinSize;
this.createDir = createDir;
Expand Down
12 changes: 10 additions & 2 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ public void write(String path, String content) {
}

public void write(String path, byte[] content) {
write(nativeHandle, path, content);
write(nativeHandle, path, content, WriteOptions.builder().build());
}

public void write(String path, String content, WriteOptions options) {
write(path, content.getBytes(StandardCharsets.UTF_8), options);
}

public void write(String path, byte[] content, WriteOptions options) {
write(nativeHandle, path, content, options);
}

public OperatorOutputStream createOutputStream(String path) {
Expand Down Expand Up @@ -128,7 +136,7 @@ public List<Entry> list(String path) {

private static native long duplicate(long op);

private static native void write(long op, String path, byte[] content);
private static native void write(long op, String path, byte[] content, WriteOptions options);

private static native byte[] read(long op, String path);

Expand Down
69 changes: 69 additions & 0 deletions bindings/java/src/main/java/org/apache/opendal/WriteOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.apache.opendal;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class WriteOptions {

/**
* Sets the Content-Type header for the object.
* Requires capability: writeWithContentType
*/
private String contentType;

/**
* Sets the Content-Disposition header for the object
* Requires capability: writeWithContentDisposition
*/
private String contentDisposition;

/**
* Sets the Cache-Control header for the object
* Requires capability: writeWithCacheControl
*/
private String cacheControl;

/**
* Sets the Content-Encoding header for the object
*/
private String contentEncoding;

/**
* Sets the If-Match header for conditional writes
* Requires capability: writeWithIfMatch
*/
private String ifMatch;

/**
* Sets the If-None-Match header for conditional writes
* Requires capability: writeWithIfNoneMatch
*/
private String ifNoneMatch;

/**
* Sets custom metadata for the file.
* Requires capability: writeWithUserMetadata
*/
private Map<String, String> userMetadata;

/**
* Enables append mode for writing.
* When true, data will be appended to the end of existing file.
* Requires capability: writeCanAppend
*/
private boolean append;

/**
* Write only if the file does not exist.
* Operation will fail if the file at the designated path already exists.
* Requires capability: writeWithIfNotExists
*/
private boolean ifNotExists;
}
25 changes: 22 additions & 3 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use jni::sys::jsize;
use jni::JNIEnv;
use opendal::BlockingOperator;

use crate::convert::jstring_to_string;
use crate::convert::{get_optional_string_from_object, jstring_to_string};
use crate::make_entry;
use crate::make_metadata;
use crate::Result;
Expand Down Expand Up @@ -90,8 +90,9 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
op: *mut BlockingOperator,
path: JString,
content: JByteArray,
write_options: JObject
) {
intern_write(&mut env, &mut *op, path, content).unwrap_or_else(|e| {
intern_write(&mut env, &mut *op, path, content, write_options).unwrap_or_else(|e| {
e.throw(&mut env);
})
}
Expand All @@ -101,10 +102,28 @@ fn intern_write(
op: &mut BlockingOperator,
path: JString,
content: JByteArray,
write_options: JObject
) -> Result<()> {
let path = jstring_to_string(env, &path)?;
let content = env.convert_byte_array(content)?;
Ok(op.write(&path, content).map(|_| ())?)

let content_type = get_optional_string_from_object(env, &write_options, "getContentType")?;
let content_disposition = get_optional_string_from_object(env, &write_options, "getContentDisposition")?;
let cache_control = get_optional_string_from_object(env, &write_options, "getCacheControl")?;
let append = env.call_method(&write_options, "isAppend", "()Z", &[])?.z()?;

let mut write_op = op.write_with(&path, content);
if let Some(ct) = content_type {
write_op = write_op.content_type(&ct);
}
if let Some(cd) = content_disposition {
write_op = write_op.content_disposition(&cd);
}
if let Some(cc) = cache_control {
write_op = write_op.cache_control(&cc);
}
write_op = write_op.append(append);
Ok(write_op.call().map(|_| ())?)
}

/// # Safety
Expand Down
Loading

0 comments on commit 7032caf

Please sign in to comment.