Skip to content

Commit

Permalink
[ISSUE apache#660] Add namespace in java client (apache#661)
Browse files Browse the repository at this point in the history
* Add namespace for java client

* Add checkNotNull
  • Loading branch information
drpmma authored Jan 10, 2024
1 parent 2d3cdf7 commit 0aaf9f9
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ public class ClientConfiguration {
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
private final boolean sslEnabled;
private final String namespace;

/**
* The caller is supposed to have validated the arguments and handled throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
Duration requestTimeout, boolean sslEnabled) {
Duration requestTimeout, boolean sslEnabled, String namespace) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
this.namespace = namespace;
}

public static ClientConfigurationBuilder newBuilder() {
Expand All @@ -60,4 +62,8 @@ public Duration getRequestTimeout() {
public boolean isSslEnabled() {
return sslEnabled;
}

public String getNamespace() {
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ClientConfigurationBuilder {
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
private String namespace = "";

/**
* Configure the access point with which the SDK should communicate.
Expand Down Expand Up @@ -82,6 +83,16 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
return this;
}

/**
* Configure namespace for client
* @param namespace namespace
* @return The {@link ClientConfigurationBuilder} instance, to allow for method chaining.
*/
public ClientConfigurationBuilder setNamespace(String namespace) {
this.namespace = checkNotNull(namespace, "namespace should not be null");
return this;
}

/**
* Finalize the build of {@link ClientConfiguration}.
*
Expand All @@ -90,6 +101,6 @@ public ClientConfigurationBuilder enableSsl(boolean sslEnabled) {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled);
return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, sslEnabled, namespace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,10 @@ public void onFailure(Throwable t) {
}

protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
Resource topicResource = Resource.newBuilder().setName(topic).build();
Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).build();
final RpcFuture<QueryRouteRequest, QueryRouteResponse> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@
import org.apache.rocketmq.client.java.route.Endpoints;

public abstract class Settings {
protected final String namespace;
protected final ClientId clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;

public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
Duration requestTimeout) {
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
RetryPolicy retryPolicy, Duration requestTimeout) {
this.namespace = namespace;
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
this.requestTimeout = requestTimeout;
}

public Settings(ClientId clientId, ClientType clientType, Endpoints accessPoint, Duration requestTimeout) {
this(clientId, clientType, accessPoint, null, requestTimeout);
public Settings(String namespace, ClientId clientId, ClientType clientType, Endpoints accessPoint,
Duration requestTimeout) {
this(namespace, clientId, clientType, accessPoint, null, requestTimeout);
}

public abstract apache.rocketmq.v2.Settings toProtobuf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRe
}

private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
final Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
final AckMessageEntry entry = AckMessageEntry.newBuilder()
.setMessageId(messageView.getMessageId().toString())
.setReceiptHandle(messageView.getReceiptHandle())
Expand All @@ -134,7 +137,9 @@ private AckMessageRequest wrapAckMessageRequest(MessageViewImpl messageView) {

private ChangeInvisibleDurationRequest wrapChangeInvisibleDuration(MessageViewImpl messageView,
Duration invisibleDuration) {
final Resource topicResource = Resource.newBuilder().setName(messageView.getTopic()).build();
final Resource topicResource = Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic()).build();
return ChangeInvisibleDurationRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setInvisibleDuration(Durations.fromNanos(invisibleDuration.toNanos()))
Expand Down Expand Up @@ -219,7 +224,10 @@ public void onFailure(Throwable t) {
}

protected Resource getProtobufGroup() {
return Resource.newBuilder().setName(consumerGroup).build();
return Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(consumerGroup)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public PushConsumerImpl(ClientConfiguration clientConfiguration, String consumer
int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new Resource(consumerGroup);
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientId, endpoints, groupResource,
clientConfiguration.getRequestTimeout(), subscriptionExpressions);
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.pushSubscriptionSettings = new PushSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
this.cacheAssignments = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -261,7 +261,10 @@ private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic
}

private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(topic)
.build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
.setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
Expand Down Expand Up @@ -500,7 +503,10 @@ public void onFailure(Throwable t) {
private ForwardMessageToDeadLetterQueueRequest wrapForwardMessageToDeadLetterQueueRequest(
MessageViewImpl messageView) {
final apache.rocketmq.v2.Resource topicResource =
apache.rocketmq.v2.Resource.newBuilder().setName(messageView.getTopic()).build();
apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(messageView.getTopic())
.build();
return ForwardMessageToDeadLetterQueueRequest.newBuilder().setGroup(getProtobufGroup()).setTopic(topicResource)
.setReceiptHandle(messageView.getReceiptHandle())
.setMessageId(messageView.getMessageId().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public class PushSubscriptionSettings extends Settings {
private volatile int receiveBatchSize = 32;
private volatile Duration longPollingTimeout = Duration.ofSeconds(30);

public PushSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
public PushSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Map<String, FilterExpression> subscriptionExpression) {
super(clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
super(namespace, clientId, ClientType.PUSH_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
}
Expand All @@ -75,7 +75,10 @@ public apache.rocketmq.v2.Settings toProtobuf() {
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic =
apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(namespace)
.setName(entry.getKey())
.build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
Resource groupResource = new Resource(consumerGroup);
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientId, endpoints,
groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
Resource groupResource = new Resource(clientConfiguration.getNamespace(), consumerGroup);
this.simpleSubscriptionSettings = new SimpleSubscriptionSettings(clientConfiguration.getNamespace(), clientId,
endpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class SimpleSubscriptionSettings extends Settings {
private final Duration longPollingTimeout;
private final Map<String, FilterExpression> subscriptionExpressions;

public SimpleSubscriptionSettings(ClientId clientId, Endpoints endpoints, Resource group,
public SimpleSubscriptionSettings(String namespace, ClientId clientId, Endpoints endpoints, Resource group,
Duration requestTimeout, Duration longPollingTimeout, Map<String, FilterExpression> subscriptionExpression) {
super(clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
super(namespace, clientId, ClientType.SIMPLE_CONSUMER, endpoints, requestTimeout);
this.group = group;
this.subscriptionExpressions = subscriptionExpression;
this.longPollingTimeout = longPollingTimeout;
Expand All @@ -59,7 +59,9 @@ public apache.rocketmq.v2.Settings toProtobuf() {
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder()
.setName(entry.getKey()).build();
.setResourceNamespace(namespace)
.setName(entry.getKey())
.build();
final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class ProducerImpl extends ClientImpl implements Producer {
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
this.publishingSettings = new PublishingSettings(clientId, endpoints, retryPolicy,
clientConfiguration.getRequestTimeout(), topics);
this.publishingSettings = new PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
retryPolicy, clientConfiguration.getRequestTimeout(), topics);
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -259,7 +259,10 @@ public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, M
String transactionId, final TransactionResolution resolution) throws ClientException {
final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
.setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build());
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(generalMessage.getTopic())
.build());
switch (resolution) {
case COMMIT:
builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT);
Expand Down Expand Up @@ -415,7 +418,8 @@ private ListenableFuture<List<SendReceiptImpl>> send(List<Message> messages, boo
*/
private SendMessageRequest wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
.map(publishingMessage -> publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
.map(publishingMessage -> publishingMessage.toProtobuf(clientConfiguration.getNamespace(), mq))
.collect(Collectors.toList());
return SendMessageRequest.newBuilder().addAllMessages(messages).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class PublishingSettings extends Settings {
private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
private volatile boolean validateMessageType = true;

public PublishingSettings(ClientId clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
Duration requestTimeout, Set<String> topics) {
super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
public PublishingSettings(String namespace, ClientId clientId, Endpoints accessPoint,
ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, Set<String> topics) {
super(namespace, clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
this.topics = topics;
}

Expand All @@ -62,8 +62,13 @@ public boolean isValidateMessageType() {
@Override
public apache.rocketmq.v2.Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
.addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
.collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
.addAllTopics(topics.stream().map(name -> Resource.newBuilder()
.setResourceNamespace(namespace)
.setName(name)
.build())
.collect(Collectors.toList()))
.setValidateMessageType(validateMessageType)
.build();
final apache.rocketmq.v2.Settings.Builder builder = apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public MessageType getMessageType() {
* <p>This method should be invoked before each message sending, because the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
public apache.rocketmq.v2.Message toProtobuf(String namespace, MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
Expand All @@ -112,7 +112,7 @@ public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
// Message group
this.getMessageGroup().ifPresent(systemPropertiesBuilder::setMessageGroup);
final SystemProperties systemProperties = systemPropertiesBuilder.build();
Resource topicResource = Resource.newBuilder().setName(getTopic()).build();
Resource topicResource = Resource.newBuilder().setResourceNamespace(namespace).setName(getTopic()).build();
return apache.rocketmq.v2.Message.newBuilder()
// Topic
.setTopic(topicResource)
Expand Down
Loading

0 comments on commit 0aaf9f9

Please sign in to comment.