Skip to content

Commit

Permalink
[ISSUE #878] The java implementation of recalling API (#879)
Browse files Browse the repository at this point in the history
  • Loading branch information
imzs authored Dec 10, 2024
1 parent f5e2d20 commit d2f9588
Show file tree
Hide file tree
Showing 15 changed files with 267 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/java_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ jobs:
with:
flags: java
fail_ci_if_error: true
token: e7eb01be-398b-4f7f-a73e-dc35c428cb50
verbose: true
19 changes: 10 additions & 9 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
* 可用 - ✅
* 进行中 - 🚧

| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP |
| ---------------------------------------------- | :---: | :---: | :---: | :----: | :---: | :----: | :-----: | :---: |
| Producer with standard messages |||||||| 🚧 |
| Producer with FIFO messages |||||||| 🚧 |
| Producer with timed/delay messages |||||||| 🚧 |
| Producer with transactional messages |||||||| 🚧 |
| Simple consumer |||||||| 🚧 |
| Push consumer with concurrent message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |
| 特性 | Java | C/C++ | C# | Golang | Rust | Python | Node.js | PHP |
|------------------------------------------------| :---: |:------:|:-----:|:------:|:----:|:------:|:-------:| :---: |
| Producer with standard messages |||||||| 🚧 |
| Producer with FIFO messages |||||||| 🚧 |
| Producer with timed/delay messages |||||||| 🚧 |
| Producer with transactional messages |||||||| 🚧 |
| Producer with recalling timed/delay messages || 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Simple consumer |||||||| 🚧 |
| Push consumer with concurrent message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |

## 先决条件和构建

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Provide cloud-native and robust solutions for Java, C++, C#, Golang, Rust and al
| Producer with FIFO messages |||||||| 🚧 |
| Producer with timed/delay messages |||||||| 🚧 |
| Producer with transactional messages |||||||| 🚧 |
| Producer with recalling timed/delay messages || 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 | 🚧 |
| Simple consumer |||||||| 🚧 |
| Push consumer with concurrent message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |
| Push consumer with FIFO message listener ||| 🚧 | 🚧 || 🚧 | 🚧 | 🚧 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ public interface Producer extends Closeable {
*/
Transaction beginTransaction() throws ClientException;

/**
* Recall message synchronously, only delay message is supported for now.
*
* <pre>{@code
* SendReceipt receipt = producer.send(message);
* String recallHandle = receipt.getRecallHandle();
* }</pre>
*
* @param topic the topic of the operation
* @param recallHandle the handle to identify a message to recall
* @return the returned receipt, or throw exception if response status is not OK.
*/
RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException;

/**
* Recall message asynchronously.
*
* <p>This method returns immediately, the result is included in the {@link CompletableFuture};
*
* @param topic the topic of the operation
* @param recallHandle the handle to identify a message to recall
* @return a future that indicates the receipt
*/
CompletableFuture<RecallReceipt> recallMessageAsync(String topic, String recallHandle);

/**
* Closes the producer and releases all related resources.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.producer;

import org.apache.rocketmq.client.apis.message.MessageId;

public interface RecallReceipt {
MessageId getMessageId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@
*/
public interface SendReceipt {
MessageId getMessageId();

// Unique handle to identify a message to recall, only delay message is supported for now
String getRecallHandle();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -170,6 +172,16 @@ public abstract RpcFuture<EndTransactionRequest, EndTransactionResponse> endTran
public abstract RpcFuture<NotifyClientTerminationRequest, NotifyClientTerminationResponse>
notifyClientTermination(Endpoints endpoints, NotifyClientTerminationRequest request, Duration duration);

/**
* recall message asynchronously, the method ensures no throwable.
* @param endpoints request endpoints.
* @param request recall message request.
* @param duration request max duration.
* @return invocation of response future.
*/
public abstract RpcFuture<RecallMessageRequest, RecallMessageResponse> recallMessage(Endpoints endpoints,
RecallMessageRequest request, Duration duration);

/**
* Establish telemetry session stream to server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -344,6 +346,21 @@ public RpcFuture<EndTransactionRequest, EndTransactionResponse> endTransaction(E
}
}

@Override
public RpcFuture<RecallMessageRequest, RecallMessageResponse> recallMessage(Endpoints endpoints,
RecallMessageRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<RecallMessageResponse> future =
rpcClient.recallMessage(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Duration duration,
StreamObserver<TelemetryCommand> responseObserver) throws ClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
Expand Down Expand Up @@ -50,16 +52,19 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import net.javacrumbs.futureconverter.java8guava.FutureConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.RecallReceipt;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
Expand Down Expand Up @@ -250,6 +255,18 @@ public Transaction beginTransaction() {
return new TransactionImpl(this);
}

@Override
public RecallReceipt recallMessage(String topic, String recallHandle) throws ClientException {
final ListenableFuture<RecallReceipt> future = recallMessage0(topic, recallHandle);
return handleClientFuture(future);
}

@Override
public CompletableFuture<RecallReceipt> recallMessageAsync(String topic, String recallHandle) {
final ListenableFuture<RecallReceipt> future = recallMessage0(topic, recallHandle);
return FutureConverter.toCompletableFuture(future);
}

@Override
public void close() {
this.stopAsync().awaitTerminated();
Expand Down Expand Up @@ -561,4 +578,32 @@ private ListenableFuture<PublishingLoadBalancer> getPublishingLoadBalancer(final
return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic,
topicRouteData), MoreExecutors.directExecutor());
}

ListenableFuture<RecallReceipt> recallMessage0(String topic, String recallHandle) {
if (!this.isRunning()) {
final IllegalStateException e = new IllegalStateException("Producer is not running now");
log.error("Unable to recall message because producer is not running, state={}, clientId={}",
this.state(), clientId);
return Futures.immediateFailedFuture(e);
}
if (StringUtils.isEmpty(recallHandle)) {
return Futures.immediateFailedFuture(new IllegalArgumentException("recall handle is invalid"));
}
final RecallMessageRequest request = RecallMessageRequest.newBuilder()
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build())
.setRecallHandle(recallHandle)
.build();
final Duration requestTimeout = clientConfiguration.getRequestTimeout();
final RpcFuture<RecallMessageRequest, RecallMessageResponse> future =
this.getClientManager().recallMessage(endpoints, request, requestTimeout);

return Futures.transformAsync(future, response -> {
final Status status = response.getStatus();
StatusChecker.check(status, future);
return Futures.immediateFuture(new RecallReceiptImpl(response.getMessageId()));
}, MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.java.impl.producer;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.RecallReceipt;
import org.apache.rocketmq.client.java.message.MessageIdCodec;

public class RecallReceiptImpl implements RecallReceipt {
private final MessageId messageId;

public RecallReceiptImpl(String messageIdStr) {
messageId = MessageIdCodec.getInstance().decode(messageIdStr);
}

@Override
public MessageId getMessageId() {
return messageId;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("messageId", messageId)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,29 @@
public class SendReceiptImpl implements SendReceipt {
private final MessageId messageId;
private final String transactionId;
private final String recallHandle;
private final MessageQueueImpl messageQueue;
private final long offset;

private SendReceiptImpl(MessageId messageId, String transactionId, MessageQueueImpl messageQueue, long offset) {
private SendReceiptImpl(MessageId messageId, String transactionId,
MessageQueueImpl messageQueue, long offset, String recallHandle) {
this.messageId = messageId;
this.transactionId = transactionId;
this.messageQueue = messageQueue;
this.offset = offset;
this.recallHandle = recallHandle;
}

@Override
public MessageId getMessageId() {
return messageId;
}

@Override
public String getRecallHandle() {
return recallHandle;
}

public MessageQueueImpl getMessageQueue() {
return messageQueue;
}
Expand Down Expand Up @@ -87,7 +95,8 @@ public static List<SendReceiptImpl> processResponseInvocation(MessageQueueImpl m
final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId());
final String transactionId = entry.getTransactionId();
final long offset = entry.getOffset();
final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset);
final String recallHandle = entry.getRecallHandle();
final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset, recallHandle);
sendReceipts.add(impl);
}
return sendReceipts;
Expand All @@ -97,6 +106,7 @@ public static List<SendReceiptImpl> processResponseInvocation(MessageQueueImpl m
public String toString() {
return MoreObjects.toStringHelper(this)
.add("messageId", messageId)
.add("recallHandle", recallHandle)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -186,6 +188,18 @@ ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata,
ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Metadata metadata,
NotifyClientTerminationRequest request, Executor executor, Duration duration);

/**
* Recall message asynchronously.
*
* @param metadata gRPC request header metadata.
* @param request recall message request
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
* @return invocation of response future.
*/
ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata,
RecallMessageRequest request, Executor executor, Duration duration);

/**
* Start a streaming request and get the request observer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
Expand Down Expand Up @@ -212,6 +214,14 @@ public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination
.withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request);
}

@Override
public ListenableFuture<RecallMessageResponse> recallMessage(Metadata metadata,
RecallMessageRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).recallMessage(request);
}

@Override
public StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor executor, Duration duration,
StreamObserver<TelemetryCommand> responseObserver) {
Expand Down
Loading

0 comments on commit d2f9588

Please sign in to comment.