From e9f60b4cb0969034d707dc4ed46667ec44f7a4ad Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Sun, 13 Oct 2024 21:24:13 +0300 Subject: [PATCH] [improve][build] Drop pulsar-client-1x and pulsar-client-2x-shaded modules --- pom.xml | 1 - pulsar-bom/pom.xml | 15 - pulsar-client-1x-base/pom.xml | 80 --- .../pulsar-client-1x/pom.xml | 93 ---- .../client/api/ClientConfiguration.java | 388 -------------- .../apache/pulsar/client/api/Consumer.java | 331 ------------ .../client/api/ConsumerConfiguration.java | 411 --------------- .../pulsar/client/api/MessageBuilder.java | 139 ----- .../pulsar/client/api/MessageListener.java | 56 --- .../apache/pulsar/client/api/Producer.java | 199 -------- .../client/api/ProducerConfiguration.java | 474 ------------------ .../pulsar/client/api/PulsarClient.java | 273 ---------- .../org/apache/pulsar/client/api/Reader.java | 81 --- .../client/api/ReaderConfiguration.java | 175 ------- .../pulsar/client/api/ReaderListener.java | 52 -- .../pulsar/client/api/package-info.java | 22 - .../client/impl/MessageBuilderImpl.java | 115 ----- .../pulsar/client/impl/package-info.java | 22 - .../pulsar/client/impl/v1/ConsumerV1Impl.java | 176 ------- .../pulsar/client/impl/v1/ProducerV1Impl.java | 90 ---- .../client/impl/v1/PulsarClientV1Impl.java | 172 ------- .../pulsar/client/impl/v1/ReaderV1Impl.java | 85 ---- .../pulsar/client/impl/v1/package-info.java | 22 - .../src/main/resources/findbugsExclude.xml | 48 -- .../pulsar-client-2x-shaded/pom.xml | 97 ---- 25 files changed, 3617 deletions(-) delete mode 100644 pulsar-client-1x-base/pom.xml delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/pom.xml delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java delete mode 100644 pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml delete mode 100644 pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml diff --git a/pom.xml b/pom.xml index b89dd1597cc84..5734c6fddac92 100644 --- a/pom.xml +++ b/pom.xml @@ -2475,7 +2475,6 @@ flexible messaging model and an intuitive client API. pulsar-client-api pulsar-client pulsar-client-shaded - pulsar-client-1x-base pulsar-client-admin-api pulsar-client-admin pulsar-client-admin-shaded diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml index e674301f18a3a..b4c271ff8912c 100644 --- a/pulsar-bom/pom.xml +++ b/pulsar-bom/pom.xml @@ -215,21 +215,6 @@ pulsar-cli-utils ${project.version} - - org.apache.pulsar - pulsar-client-1x-base - ${project.version} - - - org.apache.pulsar - pulsar-client-1x - ${project.version} - - - org.apache.pulsar - pulsar-client-2x-shaded - ${project.version} - org.apache.pulsar pulsar-client-admin-api diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml deleted file mode 100644 index fedbe80cc6261..0000000000000 --- a/pulsar-client-1x-base/pom.xml +++ /dev/null @@ -1,80 +0,0 @@ - - - 4.0.0 - - - org.apache.pulsar - pulsar - 4.0.0-SNAPSHOT - - - pulsar-client-1x-base - Pulsar Client 1.x Compatibility Base - pom - - - pulsar-client-2x-shaded - pulsar-client-1x - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${pulsar.client.compiler.release} - - - - com.github.spotbugs - spotbugs-maven-plugin - ${spotbugs-maven-plugin.version} - - - spotbugs - verify - - check - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - checkstyle - verify - - check - - - - - - - - diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml deleted file mode 100644 index b9c8fa7d3eb04..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/pom.xml +++ /dev/null @@ -1,93 +0,0 @@ - - - 4.0.0 - - - org.apache.pulsar - pulsar-client-1x-base - 4.0.0-SNAPSHOT - - - pulsar-client-1x - Pulsar Client 1.x Compatibility API - - - - ${project.groupId} - pulsar-client-2x-shaded - ${project.version} - - - - com.google.guava - guava - - - - org.apache.commons - commons-lang3 - - - - - - - - org.gaul - modernizer-maven-plugin - - true - 8 - - - - modernizer - verify - - modernizer - - - - - - - com.github.spotbugs - spotbugs-maven-plugin - ${spotbugs-maven-plugin.version} - - ${basedir}/src/main/resources/findbugsExclude.xml - - - - spotbugs - verify - - check - - - - - - - - diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java deleted file mode 100644 index 3b0efe64cf588..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; - -/** - * Class used to specify client side configuration like authentication, etc.. - * - * @deprecated Use {@link PulsarClient#builder()} to construct and configure a new {@link PulsarClient} instance - */ -@Deprecated -public class ClientConfiguration implements Serializable { - - private static final long serialVersionUID = 1L; - - private final ClientConfigurationData confData = new ClientConfigurationData(); - - /** - * @return the authentication provider to be used - */ - public Authentication getAuthentication() { - return confData.getAuthentication(); - } - - /** - * Set the authentication provider to use in the Pulsar client instance. - *

- * Example: - *

- * - *

-     * 
-     * ClientConfiguration confData = new ClientConfiguration();
-     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
-     * String authParamsString = "key1:val1,key2:val2";
-     * Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
-     * confData.setAuthentication(auth);
-     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
-     * ....
-     * 
-     * 
- * - * @param authentication - */ - public void setAuthentication(Authentication authentication) { - confData.setAuthentication(authentication); - } - - /** - * Set the authentication provider to use in the Pulsar client instance. - *

- * Example: - *

- * - *

-     * 
-     * ClientConfiguration confData = new ClientConfiguration();
-     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
-     * String authParamsString = "key1:val1,key2:val2";
-     * confData.setAuthentication(authPluginClassName, authParamsString);
-     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
-     * ....
-     * 
-     * 
- * - * @param authPluginClassName - * name of the Authentication-Plugin you want to use - * @param authParamsString - * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2" - * @throws UnsupportedAuthenticationException - * failed to instantiate specified Authentication-Plugin - */ - public void setAuthentication(String authPluginClassName, String authParamsString) - throws UnsupportedAuthenticationException { - confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString)); - } - - /** - * Set the authentication provider to use in the Pulsar client instance. - *

- * Example: - *

- * - *

-     * 
-     * ClientConfiguration confData = new ClientConfiguration();
-     * String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
-     * Map authParams = new HashMap();
-     * authParams.put("key1", "val1");
-     * confData.setAuthentication(authPluginClassName, authParams);
-     * PulsarClient client = PulsarClient.create(serviceUrl, confData);
-     * ....
-     * 
-     * 
- * - * @param authPluginClassName - * name of the Authentication-Plugin you want to use - * @param authParams - * map which represents parameters for the Authentication-Plugin - * @throws UnsupportedAuthenticationException - * failed to instantiate specified Authentication-Plugin - */ - public void setAuthentication(String authPluginClassName, Map authParams) - throws UnsupportedAuthenticationException { - confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams)); - } - - /** - * @return the operation timeout in ms - */ - public long getOperationTimeoutMs() { - return confData.getOperationTimeoutMs(); - } - - /** - * Set the operation timeout (default: 30 seconds). - *

- * Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the - * operation will be marked as failed - * - * @param operationTimeout - * operation timeout - * @param unit - * time unit for {@code operationTimeout} - */ - public void setOperationTimeout(int operationTimeout, TimeUnit unit) { - checkArgument(operationTimeout >= 0); - confData.setOperationTimeoutMs(unit.toMillis(operationTimeout)); - } - - /** - * @return the number of threads to use for handling connections - */ - public int getIoThreads() { - return confData.getNumIoThreads(); - } - - /** - * Set the number of threads to be used for handling connections to brokers (default: 1 thread). - * - * @param numIoThreads - */ - public void setIoThreads(int numIoThreads) { - checkArgument(numIoThreads > 0); - confData.setNumIoThreads(numIoThreads); - } - - /** - * @return the number of threads to use for message listeners - */ - public int getListenerThreads() { - return confData.getNumListenerThreads(); - } - - /** - * Set the number of threads to be used for message listeners (default: 1 thread). - * - * @param numListenerThreads - */ - public void setListenerThreads(int numListenerThreads) { - checkArgument(numListenerThreads > 0); - confData.setNumListenerThreads(numListenerThreads); - } - - /** - * @return the max number of connections per single broker - */ - public int getConnectionsPerBroker() { - return confData.getConnectionsPerBroker(); - } - - /** - * Sets the max number of connection that the client library will open to a single broker. - *

- * By default, the connection pool will use a single connection for all the producers and consumers. Increasing this - * parameter may improve throughput when using many producers over a high latency connection. - *

- * - * @param connectionsPerBroker - * max number of connections per broker (needs to be greater than 0) - */ - public void setConnectionsPerBroker(int connectionsPerBroker) { - checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0"); - confData.setConnectionsPerBroker(connectionsPerBroker); - } - - /** - * @return whether TCP no-delay should be set on the connections - */ - public boolean isUseTcpNoDelay() { - return confData.isUseTcpNoDelay(); - } - - /** - * Configure whether to use TCP no-delay flag on the connection, to disable Nagle algorithm. - *

- * No-delay features make sure packets are sent out on the network as soon as possible, and it's critical to achieve - * low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall - * throughput, so if latency is not a concern, it's advisable to set the useTcpNoDelay flag to false. - *

- * Default value is true - * - * @param useTcpNoDelay - */ - public void setUseTcpNoDelay(boolean useTcpNoDelay) { - confData.setUseTcpNoDelay(useTcpNoDelay); - } - - /** - * @return whether TLS encryption is used on the connection - */ - public boolean isUseTls() { - return confData.isUseTls(); - } - - /** - * Configure whether to use TLS encryption on the connection (default: false). - * - * @param useTls - */ - public void setUseTls(boolean useTls) { - confData.setUseTls(useTls); - } - - /** - * @return path to the trusted TLS certificate file - */ - public String getTlsTrustCertsFilePath() { - return confData.getTlsTrustCertsFilePath(); - } - - /** - * Set the path to the trusted TLS certificate file. - * - * @param tlsTrustCertsFilePath - */ - public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { - confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath); - } - - /** - * @return whether the Pulsar client accept untrusted TLS certificate from broker - */ - public boolean isTlsAllowInsecureConnection() { - return confData.isTlsAllowInsecureConnection(); - } - - /** - * Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false). - * - * @param tlsAllowInsecureConnection - */ - public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) { - confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection); - } - - /** - * Stats will be activated with positive statsIntervalSeconds. - * - * @return the interval between each stat info (default: 60 seconds) - */ - public long getStatsIntervalSeconds() { - return confData.getStatsIntervalSeconds(); - } - - /** - * Set the interval between each stat info (default: 60 seconds) Stats will be activated with positive. - * statsIntervalSeconds It should be set to at least 1 second - * - * @param statsInterval - * the interval between each stat info - * @param unit - * time unit for {@code statsInterval} - */ - public void setStatsInterval(long statsInterval, TimeUnit unit) { - confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval)); - } - - /** - * Get configured total allowed concurrent lookup-request. - * - * @return - */ - public int getConcurrentLookupRequest() { - return confData.getConcurrentLookupRequest(); - } - - /** - * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. - * (default: 50000) It should be configured with higher value only in case of it requires to - * produce/subscribe on thousands of topic using created {@link PulsarClient} - * - * @param concurrentLookupRequest - */ - public void setConcurrentLookupRequest(int concurrentLookupRequest) { - confData.setConcurrentLookupRequest(concurrentLookupRequest); - } - - /** - * Get configured max number of reject-request in a time-frame (60 seconds) after which connection will be closed. - * - * @return - */ - public int getMaxNumberOfRejectedRequestPerConnection() { - return confData.getMaxNumberOfRejectedRequestPerConnection(); - } - - /** - * Set max number of broker-rejected requests in a certain time-frame (60 seconds) after which current connection. - * will be closed and client creates a new connection that give chance to connect a different broker (default: - * 50) - * - * @param maxNumberOfRejectedRequestPerConnection - */ - public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) { - confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection); - } - - public boolean isTlsHostnameVerificationEnable() { - return confData.isTlsHostnameVerificationEnable(); - } - - /** - * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 - * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. - * Server Identity hostname verification. - * - * @see rfc2818 - * - * @param tlsHostnameVerificationEnable - */ - public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) { - confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable); - } - - public ClientConfiguration setServiceUrl(String serviceUrl) { - confData.setServiceUrl(serviceUrl); - return this; - } - - /** - * Set the duration of time to wait for a connection to a broker to be established. If the duration - * passes without a response from the broker, the connection attempt is dropped. - * - * @param duration the duration to wait - * @param unit the time unit in which the duration is defined - */ - public void setConnectionTimeout(int duration, TimeUnit unit) { - confData.setConnectionTimeoutMs((int) unit.toMillis(duration)); - } - - /** - * Get the duration of time for which the client will wait for a connection to a broker to be - * established before giving up. - * - * @return the duration, in milliseconds - */ - public long getConnectionTimeoutMs() { - return confData.getConnectionTimeoutMs(); - } - - public ClientConfigurationData getConfigurationData() { - return confData; - } - -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java deleted file mode 100644 index d84b9981b0320..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * An interface that abstracts behavior of Pulsar's consumer. - */ -public interface Consumer extends Closeable { - - /** - * Get a topic for the consumer. - * - * @return topic for the consumer - */ - String getTopic(); - - /** - * Get a subscription for the consumer. - * - * @return subscription for the consumer - */ - String getSubscription(); - - /** - * Unsubscribe the consumer - *

- * This call blocks until the consumer is unsubscribed. - * - * @throws PulsarClientException - */ - void unsubscribe() throws PulsarClientException; - - /** - * Asynchronously unsubscribe the consumer. - * - * @return {@link CompletableFuture} for this operation - */ - CompletableFuture unsubscribeAsync(); - - /** - * Receives a single message. - *

- * This calls blocks until a message is available. - * - * @return the received message - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws PulsarClientException.InvalidConfigurationException - * if a message listener was defined in the configuration - */ - Message receive() throws PulsarClientException; - - /** - * Receive a single message - *

- * Retrieves a message when it will be available and completes {@link CompletableFuture} with received message. - *

- *

- * {@code receiveAsync()} should be called subsequently once returned {@code CompletableFuture} gets complete with - * received message. Else it creates backlog of receive requests in the application. - *

- * - * @return {@link CompletableFuture}<{@link Message}> will be completed when message is available - */ - CompletableFuture> receiveAsync(); - - /** - * Receive a single message - *

- * Retrieves a message, waiting up to the specified wait time if necessary. - * - * @param timeout - * 0 or less means immediate rather than infinite - * @param unit - * @return the received {@link Message} or null if no message available before timeout - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - * @throws PulsarClientException.InvalidConfigurationException - * if a message listener was defined in the configuration - */ - Message receive(int timeout, TimeUnit unit) throws PulsarClientException; - - /** - * Acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(Message message) throws PulsarClientException; - - /** - * Acknowledge the consumption of a single message, identified by its MessageId. - * - * @param messageId - * The {@code MessageId} to be acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledge(MessageId messageId) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(Message message) throws PulsarClientException; - - /** - * Acknowledge the reception of all the messages in the stream up to (and including) the provided message. - * - * This method will block until the acknowledge has been sent to the broker. After that, the messages will not be - * re-delivered to this consumer. - * - * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * It's equivalent to calling asyncAcknowledgeCumulative(MessageId) and waiting for the callback to be triggered. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @throws PulsarClientException.AlreadyClosedException - * if the consumer was already closed - */ - void acknowledgeCumulative(MessageId messageId) throws PulsarClientException; - - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param message - * The {@code Message} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(Message message); - - /** - * Asynchronously acknowledge the consumption of a single message. - * - * @param messageId - * The {@code MessageId} to be acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeAsync(MessageId messageId); - - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param message - * The {@code Message} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(Message message); - - /** - * Asynchronously Acknowledge the reception of all the messages in the stream up to (and including) the provided - * message. - * - * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared. - * - * @param messageId - * The {@code MessageId} to be cumulatively acknowledged - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture acknowledgeCumulativeAsync(MessageId messageId); - - /** - * Get statistics for the consumer. - * - *

    - *
  • numMsgsReceived : Number of messages received in the current interval - *
  • numBytesReceived : Number of bytes received in the current interval - *
  • numReceiveFailed : Number of messages failed to receive in the current interval - *
  • numAcksSent : Number of acks sent in the current interval - *
  • numAcksFailed : Number of acks failed to send in the current interval - *
  • totalMsgsReceived : Total number of messages received - *
  • totalBytesReceived : Total number of bytes received - *
  • totalReceiveFailed : Total number of messages failed to receive - *
  • totalAcksSent : Total number of acks sent - *
  • totalAcksFailed : Total number of acks failed to sent - *
- * - * @return statistic for the consumer - */ - ConsumerStats getStats(); - - /** - * Close the consumer and stop the broker to push more messages. - */ - @Override - void close() throws PulsarClientException; - - /** - * Asynchronously close the consumer and stop the broker to push more messages. - * - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture closeAsync(); - - /** - * Return true if the topic was terminated and this consumer has already consumed all the messages in the topic. - * - * Please note that this does not simply mean that the consumer is caught up with the last message published by - * producers, rather the topic needs to be explicitly "terminated". - */ - boolean hasReachedEndOfTopic(); - - /** - * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not - * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all - * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection - * breaks, the messages are redelivered after reconnect. - */ - void redeliverUnacknowledgedMessages(); - - /** - * Reset the subscription associated with this consumer to a specific message id. - *

- * - * The message id can either be a specific message or represent the first or last messages in the topic. - *

- *

    - *
  • MessageId.earliest : Reset the subscription on the earliest message available in the topic - *
  • MessageId.latest : Reset the subscription on the latest message in the topic - *
- * - * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on - * the individual partitions. - * - * @param messageId - * the message id where to reposition the subscription - */ - void seek(MessageId messageId) throws PulsarClientException; - - /** - * Reset the subscription associated with this consumer to a specific message publish time. - * - * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on - * the individual partitions. - * - * @param timestamp - * the message publish time where to reposition the subscription - */ - void seek(long timestamp) throws PulsarClientException; - - /** - * Reset the subscription associated with this consumer to a specific message id. - *

- * - * The message id can either be a specific message or represent the first or last messages in the topic. - *

- *

    - *
  • MessageId.earliest : Reset the subscription on the earliest message available in the topic - *
  • MessageId.latest : Reset the subscription on the latest message in the topic - *
- * - * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on - * the individual partitions. - * - * @param messageId - * the message id where to reposition the subscription - * @return a future to track the completion of the seek operation - */ - CompletableFuture seekAsync(MessageId messageId); - - /** - * Reset the subscription associated with this consumer to a specific message publish time. - * - * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on - * the individual partitions. - * - * @param timestamp - * the message publish time where to reposition the subscription - * @return a future to track the completion of the seek operation - */ - CompletableFuture seekAsync(long timestamp); - - /** - * @return Whether the consumer is connected to the broker - */ - boolean isConnected(); - - /** - * Get the name of consumer. - * @return consumer name. - */ - String getConsumerName(); - - /** - * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause - * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker. - */ - void pause(); - - /** - * Resume requesting messages from the broker. - */ - void resume(); -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java deleted file mode 100644 index 81956db56f774..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ /dev/null @@ -1,411 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.v1.ConsumerV1Impl; -/** - * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to - * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers - * will be able to use the same subscription name and the messages will be dispatched in a round robin fashion. - * - * @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance - */ -@Deprecated -public class ConsumerConfiguration implements Serializable { - - /** - * Resend shouldn't be requested before minAckTimeoutMillis. - */ - static long minAckTimeoutMillis = 1000; - - private static final long serialVersionUID = 1L; - - private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); - - private MessageListener messageListener; - - public ConsumerConfiguration() { - // Disable acknowledgment grouping when using v1 API - conf.setAcknowledgementsGroupTimeMicros(0); - } - - /** - * @return the configured timeout in milliseconds for unacked messages. - */ - public long getAckTimeoutMillis() { - return conf.getAckTimeoutMillis(); - } - - /** - * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than - * 10 seconds. - * - * @param ackTimeout - * for unacked messages. - * @param timeUnit - * unit in which the timeout is provided. - * @return {@link ConsumerConfiguration} - */ - public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) { - long ackTimeoutMillis = timeUnit.toMillis(ackTimeout); - checkArgument(ackTimeoutMillis >= minAckTimeoutMillis, - "Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms"); - conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout)); - return this; - } - - /** - * @return the configured subscription type - */ - public SubscriptionType getSubscriptionType() { - return conf.getSubscriptionType(); - } - - /** - * Select the subscription type to be used when subscribing to the topic. - *

- * Default is {@link SubscriptionType#Exclusive} - * - * @param subscriptionType - * the subscription type value - */ - public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) { - Objects.requireNonNull(subscriptionType); - conf.setSubscriptionType(subscriptionType); - return this; - } - - /** - * @return the configured {@link MessageListener} for the consumer - */ - public MessageListener getMessageListener() { - return messageListener; - } - - /** - * Sets a {@link MessageListener} for the consumer - *

- * When a {@link MessageListener} is set, application will receive messages through it. Calls to - * {@link Consumer#receive()} will not be allowed. - * - * @param messageListener - * the listener object - */ - public ConsumerConfiguration setMessageListener(MessageListener messageListener) { - Objects.requireNonNull(messageListener); - this.messageListener = messageListener; - conf.setMessageListener(new org.apache.pulsar.shade.client.api.v2.MessageListener() { - - @Override - public void received(org.apache.pulsar.shade.client.api.v2.Consumer consumer, Message msg) { - messageListener.received(new ConsumerV1Impl(consumer), msg); - } - - @Override - public void reachedEndOfTopic(org.apache.pulsar.shade.client.api.v2.Consumer consumer) { - messageListener.reachedEndOfTopic(new ConsumerV1Impl(consumer)); - } - }); - return this; - } - - /** - * @return this configured {@link ConsumerEventListener} for the consumer. - * @see #setConsumerEventListener(ConsumerEventListener) - * @since 2.0 - */ - public ConsumerEventListener getConsumerEventListener() { - return conf.getConsumerEventListener(); - } - - /** - * Sets a {@link ConsumerEventListener} for the consumer. - * - *

- * The consumer group listener is used for receiving consumer state change in a consumer group for failover - * subscription. Application can then react to the consumer state changes. - * - *

- * This change is experimental. It is subject to changes coming in release 2.0. - * - * @param listener - * the consumer group listener object - * @return consumer configuration - * @since 2.0 - */ - public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) { - Objects.requireNonNull(listener); - conf.setConsumerEventListener(listener); - return this; - } - - /** - * @return the configure receiver queue size value - */ - public int getReceiverQueueSize() { - return conf.getReceiverQueueSize(); - } - - /** - * @return the configured max total receiver queue size across partitions - */ - public int getMaxTotalReceiverQueueSizeAcrossPartitions() { - return conf.getMaxTotalReceiverQueueSizeAcrossPartitions(); - } - - /** - * Set the max total receiver queue size across partitons. - *

- * This setting will be used to reduce the receiver queue size for individual partitions - * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000). - * - * @param maxTotalReceiverQueueSizeAcrossPartitions - */ - public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) { - checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize()); - conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions); - } - - /** - * @return the CryptoKeyReader - */ - public CryptoKeyReader getCryptoKeyReader() { - return conf.getCryptoKeyReader(); - } - - /** - * Sets a {@link CryptoKeyReader}. - * - * @param cryptoKeyReader - * CryptoKeyReader object - */ - public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { - Objects.requireNonNull(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReader); - return this; - } - - /** - * Sets the ConsumerCryptoFailureAction to the value specified. - * - * @param action - * consumer action - */ - public void setCryptoFailureAction(ConsumerCryptoFailureAction action) { - conf.setCryptoFailureAction(action); - } - - /** - * @return The ConsumerCryptoFailureAction - */ - public ConsumerCryptoFailureAction getCryptoFailureAction() { - return conf.getCryptoFailureAction(); - } - - /** - * Sets the size of the consumer receive queue. - *

- * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the - * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer - * throughput at the expense of bigger memory utilization. - *

- *

- * Setting the consumer queue size as zero - *

    - *
  • Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the - * message distribution on shared subscription, by pushing messages only to the consumers that are ready to process - * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue - * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is - * zero.
  • - *
  • Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with - * broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives - * exception in callback. consumer will not be able receive any further message unless batch-message in pipeline - * is removed
  • - *
- *

- * Default value is {@code 1000} messages and should be good for most use cases. - * - * @param receiverQueueSize - * the new receiver queue size value - */ - public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) { - checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative"); - conf.setReceiverQueueSize(receiverQueueSize); - return this; - } - - /** - * @return the consumer name - */ - public String getConsumerName() { - return conf.getConsumerName(); - } - - /** - * Set the consumer name. - * - * @param consumerName - */ - public ConsumerConfiguration setConsumerName(String consumerName) { - checkArgument(consumerName != null && !consumerName.equals("")); - conf.setConsumerName(consumerName); - return this; - } - - public int getPriorityLevel() { - return conf.getPriorityLevel(); - } - - /** - * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching - * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
- * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have - * permits, else broker will consider next priority level consumers.
- * If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch - * messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B. - * - *
-     * Consumer PriorityLevel Permits
-     * C1       0             2
-     * C2       0             1
-     * C3       0             1
-     * C4       1             2
-     * C5       1             1
-     * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
-     * 
- * - * @param priorityLevel - */ - public void setPriorityLevel(int priorityLevel) { - conf.setPriorityLevel(priorityLevel); - } - - public boolean getReadCompacted() { - return conf.isReadCompacted(); - } - - /** - * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog - * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for - * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that - * point, the messages will be sent as normal. - * - * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. - * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a - * shared subscription, will lead to the subscription call throwing a PulsarClientException. - * - * @param readCompacted - * whether to read from the compacted topic - */ - public ConsumerConfiguration setReadCompacted(boolean readCompacted) { - conf.setReadCompacted(readCompacted); - return this; - } - - /** - * Set a name/value property with this consumer. - * - * @param key - * @param value - * @return - */ - public ConsumerConfiguration setProperty(String key, String value) { - checkArgument(key != null); - checkArgument(value != null); - conf.getProperties().put(key, value); - return this; - } - - /** - * Add all the properties in the provided map. - * - * @param properties - * @return - */ - public ConsumerConfiguration setProperties(Map properties) { - conf.getProperties().putAll(properties); - return this; - } - - public Map getProperties() { - return conf.getProperties(); - } - - public ConsumerConfigurationData getConfigurationData() { - return conf; - } - - /** - * @param subscriptionInitialPosition the initial position at which to set - * set cursor when subscribing to the topic first time - * Default is {@value InitialPosition.Latest} - */ - public ConsumerConfiguration setSubscriptionInitialPosition( - SubscriptionInitialPosition subscriptionInitialPosition) { - conf.setSubscriptionInitialPosition(subscriptionInitialPosition); - return this; - } - - /** - * @return the configured {@link subscriptionInitialPosition} for the consumer - */ - public SubscriptionInitialPosition getSubscriptionInitialPosition(){ - return conf.getSubscriptionInitialPosition(); - } - - /** - * @return the configured {@link RedeliveryBackoff} for the consumer - */ - public RedeliveryBackoff getNegativeAckRedeliveryBackoff() { - return conf.getNegativeAckRedeliveryBackoff(); - } - - /** - * @param negativeAckRedeliveryBackoff the negative ack redelivery backoff policy. - * Default value is: MultiplierRedeliveryBackoff - * @return the {@link ConsumerConfiguration} - */ - public ConsumerConfiguration setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) { - conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff); - return this; - } - - /** - * @return the configured {@link RedeliveryBackoff} for the consumer - */ - public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() { - return conf.getAckTimeoutRedeliveryBackoff(); - } - - /** - * @param ackTimeoutRedeliveryBackoff redelivery backoff policy for ack timeout. - * Default value is: MultiplierRedeliveryBackoff - * @return the {@link ConsumerConfiguration} - */ - public ConsumerConfiguration setAckTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) { - conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff); - return this; - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java deleted file mode 100644 index 084312ed28c07..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import org.apache.pulsar.client.impl.MessageBuilderImpl; - -/** - * Message builder factory. Use this class to create messages to be send to the Pulsar producer - * - * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a new - * message builder. - */ -@Deprecated -public interface MessageBuilder { - - static MessageBuilder create() { - return new MessageBuilderImpl(); - } - - /** - * Finalize the immutable message. - * - * @return a {@link Message} ready to be sent through a {@link Producer} - */ - Message build(); - - /** - * Set the content of the message. - * - * @param data - * array containing the payload - */ - MessageBuilder setContent(byte[] data); - - /** - * Set the content of the message. - * - * @param data - * array containing the payload - * @param offset - * offset into the data array - * @param length - * length of the payload starting from the above offset - */ - MessageBuilder setContent(byte[] data, int offset, int length); - - /** - * Set the content of the message. - * - * @param buf - * a {@link ByteBuffer} with the payload of the message - */ - MessageBuilder setContent(ByteBuffer buf); - - /** - * Sets a new property on a message. - * - * @param name - * the name of the property - * @param value - * the associated value - */ - MessageBuilder setProperty(String name, String value); - - /** - * Add all the properties in the provided map. - */ - MessageBuilder setProperties(Map properties); - - /** - * Sets the key of the message for routing policy. - * - * @param key - */ - MessageBuilder setKey(String key); - - /** - * Set the event time for a given message. - * - *

- * Applications can retrieve the event time by calling {@link Message#getEventTime()}. - * - *

- * Note: currently pulsar doesn't support event-time based index. so the subscribers can't seek the messages by - * event time. - * - * @since 1.20.0 - */ - MessageBuilder setEventTime(long timestamp); - - /** - * Specify a custom sequence id for the message being published. - *

- * The sequence id can be used for deduplication purposes and it needs to follow these rules: - *

    - *
  1. sequenceId >= 0 - *
  2. Sequence id for a message needs to be greater than sequence id for earlier messages: - * sequenceId(N+1) > sequenceId(N) - *
  3. It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the - * sequenceId could represent an offset or a cumulative size. - *
- * - * @param sequenceId - * the sequence id to assign to the current message - * @since 1.20.0 - */ - MessageBuilder setSequenceId(long sequenceId); - - /** - * Override the replication clusters for this message. - * - * @param clusters - */ - MessageBuilder setReplicationClusters(List clusters); - - /** - * Disable replication for this message. - */ - MessageBuilder disableReplication(); -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java deleted file mode 100644 index 301740be398c0..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/MessageListener.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Serializable; - -/** - * A listener that will be called in order for every message received. - * - * - */ -public interface MessageListener extends Serializable { - /** - * This method is called whenever a new message is received. - * - * Messages are guaranteed to be delivered in order and from the same thread for a single consumer - * - * This method will only be called once for each message, unless either application or broker crashes. - * - * Implementation should acknowledge messages by calling consumer.acknowledge(msg). - * - * Application is responsible of handling any exception that could be thrown while processing the message. - * - * @param consumer - * the consumer that received the message - * @param msg - * the message object - */ - void received(Consumer consumer, Message msg); - - /** - * Get the notification when a topic is terminated. - * - * @param consumer - * the Consumer object associated with the terminated topic - */ - default void reachedEndOfTopic(Consumer consumer) { - // By default ignore the notification - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java deleted file mode 100644 index 0b431050377c3..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Producer.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; - -/** - * Producer object. - * - * The producer is used to publish messages on a topic - * - * - */ -public interface Producer extends Closeable { - - /** - * @return the topic which producer is publishing to - */ - String getTopic(); - - /** - * @return the producer name which could have been assigned by the system or specified by the client - */ - String getProducerName(); - - /** - * Sends a message. - *

- * This call will be blocking until is successfully acknowledged by the Pulsar broker. - *

- * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent. - * - * @param message - * a message - * @return the message id assigned to the published message - * @throws PulsarClientException.TimeoutException - * if the message was not correctly received by the system within the timeout period - * @throws PulsarClientException.AlreadyClosedException - * if the producer was already closed - */ - MessageId send(byte[] message) throws PulsarClientException; - - /** - * Send a message asynchronously - *

- * When the producer queue is full, by default this method will complete the future with an exception - * {@link PulsarClientException.ProducerQueueIsFullError} - *

- * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and - * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior. - *

- * Use {@link #newMessage()} to specify more properties than just the value on the message to be sent. - * - * @param message - * a byte array with the payload of the message - * @return a future that can be used to track when the message will have been safely persisted - */ - CompletableFuture sendAsync(byte[] message); - - /** - * Flush all the messages buffered in the client and wait until all messages have been successfully persisted. - * - * @throws PulsarClientException - * @since 2.1.0 - * @see #flushAsync() - */ - void flush() throws PulsarClientException; - - /** - * Flush all the messages buffered in the client asynchronously. - * - * @return a future that can be used to track when all the messages have been safely persisted. - * @since 2.1.0 - * @see #flush() - */ - CompletableFuture flushAsync(); - - /** - * Send a message. - * - * @param message - * a message - * @return the message id assigned to the published message - * @throws PulsarClientException.TimeoutException - * if the message was not correctly received by the system within the timeout period - * - * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer.newMessage()} to create a - * new message builder. - */ - @Deprecated - MessageId send(Message message) throws PulsarClientException; - - /** - * Send a message asynchronously. - *

- * When the returned {@link CompletableFuture} is marked as completed successfully, the provided message will - * contain the {@link MessageId} assigned by the broker to the published message. - *

- * Example: - * - *

-     * Message msg = MessageBuilder.create().setContent(myContent).build();
-     * producer.sendAsync(msg).thenRun(v -> {
-     *    System.out.println("Published message: " + msg.getMessageId());
-     * }).exceptionally(e -> {
-     *    // Failed to publish
-     * });
-     * 
- *

- * When the producer queue is full, by default this method will complete the future with an exception - * {@link PulsarClientException.ProducerQueueIsFullError} - *

- * See {@link ProducerBuilder#maxPendingMessages(int)} to configure the producer queue size and - * {@link ProducerBuilder#blockIfQueueFull(boolean)} to change the blocking behavior. - * - * @param message - * a message - * @return a future that can be used to track when the message will have been safely persisted - * @deprecated since 2.0. Use {@link TypedMessageBuilder} as returned by {@link Producer#newMessage()} to create a - * new message builder. - */ - @Deprecated - CompletableFuture sendAsync(Message message); - - /** - * Get the last sequence id that was published by this producer. - *

- * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that - * was published and acknowledged by the broker. - *

- * After recreating a producer with the same producer name, this will return the last message that was published in - * the previous producer session, or -1 if there no message was ever published. - * - * @return the last sequence id published by this producer - */ - long getLastSequenceId(); - - /** - * Get statistics for the producer. - * - *

    - *
  • numMsgsSent : Number of messages sent in the current interval - *
  • numBytesSent : Number of bytes sent in the current interval - *
  • numSendFailed : Number of messages failed to send in the current interval - *
  • numAcksReceived : Number of acks received in the current interval - *
  • totalMsgsSent : Total number of messages sent - *
  • totalBytesSent : Total number of bytes sent - *
  • totalSendFailed : Total number of messages failed to send - *
  • totalAcksReceived: Total number of acks received - *
- * - * @return statistic for the producer or null if ProducerStatsRecorderImpl is disabled. - */ - ProducerStats getStats(); - - /** - * Close the producer and releases resources allocated. - * - * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case - * of errors, pending writes will not be retried. - * - * @throws PulsarClientException.AlreadyClosedException - * if the producer was already closed - */ - @Override - void close() throws PulsarClientException; - - /** - * Close the producer and releases resources allocated. - * - * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case - * of errors, pending writes will not be retried. - * - * @return a future that can used to track when the producer has been closed - */ - CompletableFuture closeAsync(); - - /** - * @return Whether the producer is connected to the broker - */ - boolean isConnected(); -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java deleted file mode 100644 index 761c49ec24221..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import lombok.EqualsAndHashCode; -import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; - -/** - * Producer's configuration. - * - * @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance - */ -@Deprecated -@EqualsAndHashCode -public class ProducerConfiguration implements Serializable { - - private static final long serialVersionUID = 1L; - - private final ProducerConfigurationData conf = new ProducerConfigurationData(); - - @Deprecated - public enum MessageRoutingMode { - SinglePartition, RoundRobinPartition, CustomPartition - } - - @Deprecated - public enum HashingScheme { - JavaStringHash, Murmur3_32Hash - } - - /** - * @return the configured custom producer name or null if no custom name was specified - * @since 1.20.0 - */ - public String getProducerName() { - return conf.getProducerName(); - } - - /** - * Specify a name for the producer - *

- * If not assigned, the system will generate a globally unique name which can be access with - * {@link Producer#getProducerName()}. - *

- * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique - * across all Pulsar's clusters. - *

- * If a producer with the same name is already connected to a particular topic, the - * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}. - * - * @param producerName - * the custom name to use for the producer - * @since 1.20.0 - */ - public void setProducerName(String producerName) { - conf.setProducerName(producerName); - } - - /** - * @return the message send timeout in ms - */ - public long getSendTimeoutMs() { - return conf.getSendTimeoutMs(); - } - - /** - * Set the send timeout (default: 30 seconds) - *

- * If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. - * - * @param sendTimeout - * the send timeout - * @param unit - * the time unit of the {@code sendTimeout} - */ - public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) { - conf.setSendTimeoutMs(sendTimeout, unit); - return this; - } - - /** - * @return the maximum number of messages allowed in the outstanding messages queue for the producer - */ - public int getMaxPendingMessages() { - return conf.getMaxPendingMessages(); - } - - /** - * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. - *

- * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail - * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. - * - * @param maxPendingMessages - * @return - */ - public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) { - conf.setMaxPendingMessages(maxPendingMessages); - return this; - } - - public HashingScheme getHashingScheme() { - return HashingScheme.valueOf(conf.getHashingScheme().toString()); - } - - public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) { - conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString())); - return this; - } - - /** - * - * @return the maximum number of pending messages allowed across all the partitions - */ - public int getMaxPendingMessagesAcrossPartitions() { - return conf.getMaxPendingMessagesAcrossPartitions(); - } - - /** - * Set the number of max pending messages across all the partitions - *

- * This setting will be used to lower the max pending messages for each partition - * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value. - * - * @param maxPendingMessagesAcrossPartitions - */ - public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { - conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions); - } - - /** - * - * @return whether the producer will block {@link Producer#send} and {@link Producer#sendAsync} operations when the - * pending queue is full - */ - public boolean getBlockIfQueueFull() { - return conf.isBlockIfQueueFull(); - } - - /** - * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing - * message queue is full. - *

- * Default is false. If set to false, send operations will immediately fail with - * {@link PulsarClientException.ProducerQueueIsFullError} when there is no space left in pending queue. - * - * @param blockIfQueueFull - * whether to block {@link Producer#send} and {@link Producer#sendAsync} operations on queue full - * @return - */ - public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) { - conf.setBlockIfQueueFull(blockIfQueueFull); - return this; - } - - /** - * Set the message routing mode for the partitioned producer. - * - * @param messageRouteMode message routing mode. - * @return producer configuration - * @see MessageRoutingMode - */ - public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) { - Objects.requireNonNull(messageRouteMode); - conf.setMessageRoutingMode( - org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString())); - return this; - } - - /** - * Get the message routing mode for the partitioned producer. - * - * @return message routing mode, default is round-robin routing. - * @see MessageRoutingMode#RoundRobinPartition - */ - public MessageRoutingMode getMessageRoutingMode() { - return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString()); - } - - /** - * Set the compression type for the producer. - *

- * By default, message payloads are not compressed. Supported compression types are: - *

    - *
  • CompressionType.LZ4
  • - *
  • CompressionType.ZLIB
  • - *
- * - * @param compressionType - * @return - * - * @since 1.0.28
- * Make sure all the consumer applications have been updated to use this client version, before starting to - * compress messages. - */ - public ProducerConfiguration setCompressionType(CompressionType compressionType) { - conf.setCompressionType(compressionType); - return this; - } - - /** - * @return the configured compression type for this producer - */ - public CompressionType getCompressionType() { - return conf.getCompressionType(); - } - - /** - * Set a custom message routing policy by passing an implementation of MessageRouter. - * - * - * @param messageRouter - */ - public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) { - Objects.requireNonNull(messageRouter); - setMessageRoutingMode(MessageRoutingMode.CustomPartition); - conf.setCustomMessageRouter(messageRouter); - return this; - } - - /** - * Get the message router set by {@link #setMessageRouter(MessageRouter)}. - * - * @return message router. - * @deprecated since 1.22.0-incubating. numPartitions is already passed as parameter in - * {@link MessageRouter#choosePartition(Message, TopicMetadata)}. - * @see MessageRouter - */ - @Deprecated - public MessageRouter getMessageRouter(int numPartitions) { - return conf.getCustomMessageRouter(); - } - - /** - * Get the message router set by {@link #setMessageRouter(MessageRouter)}. - * - * @return message router set by {@link #setMessageRouter(MessageRouter)}. - */ - public MessageRouter getMessageRouter() { - return conf.getCustomMessageRouter(); - } - - /** - * Return the flag whether automatic message batching is enabled or not. - * - * @return true if batch messages are enabled. otherwise false. - * @since 2.0.0
- * It is enabled by default. - */ - public boolean getBatchingEnabled() { - return conf.isBatchingEnabled(); - } - - /** - * Control whether automatic batching of messages is enabled for the producer. default: false [No batching] - * - * When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the - * broker, leading to better throughput, especially when publishing small messages. If compression is enabled, - * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or - * contents. - * - * When enabled default batch delay is set to 1 ms and default batch size is 1000 messages - * - * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) - * @since 1.0.36
- * Make sure all the consumer applications have been updated to use this client version, before starting to - * batch messages. - * - */ - public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) { - conf.setBatchingEnabled(batchMessagesEnabled); - return this; - } - - /** - * @return the CryptoKeyReader - */ - public CryptoKeyReader getCryptoKeyReader() { - return conf.getCryptoKeyReader(); - } - - /** - * Sets a {@link CryptoKeyReader}. - * - * @param cryptoKeyReader - * CryptoKeyReader object - */ - public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { - Objects.requireNonNull(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReader); - return this; - } - - /** - * - * @return encryptionKeys - * - */ - public Set getEncryptionKeys() { - return conf.getEncryptionKeys(); - } - - /** - * - * Returns true if encryption keys are added. - * - */ - public boolean isEncryptionEnabled() { - return conf.isEncryptionEnabled(); - } - - /** - * Add public encryption key, used by producer to encrypt the data key. - * - * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are - * found, a callback getKey(String keyName) is invoked against each key to load the values of the key. Application - * should implement this callback to return the key in pkcs8 format. If compression is enabled, message is encrypted - * after compression. If batch messaging is enabled, the batched message is encrypted. - * - */ - public void addEncryptionKey(String key) { - conf.getEncryptionKeys().add(key); - } - - public void removeEncryptionKey(String key) { - conf.getEncryptionKeys().remove(key); - } - - /** - * Sets the ProducerCryptoFailureAction to the value specified. - * - * @param action - * The producer action - */ - public void setCryptoFailureAction(ProducerCryptoFailureAction action) { - conf.setCryptoFailureAction(action); - } - - /** - * @return The ProducerCryptoFailureAction - */ - public ProducerCryptoFailureAction getCryptoFailureAction() { - return conf.getCryptoFailureAction(); - } - - /** - * - * @return the batch time period in ms. - * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) - */ - public long getBatchingMaxPublishDelayMs() { - return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros()); - } - - /** - * Set the time period within which the messages sent will be batched default: 1ms if batch messages are - * enabled. If set to a non zero value, messages will be queued until this time interval or until - * - * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single - * batch message. The consumer will be delivered individual messages in the batch in the same order they were - * enqueued - * @since 1.0.36
- * Make sure all the consumer applications have been updated to use this client version, before starting to - * batch messages. - * @param batchDelay - * the batch delay - * @param timeUnit - * the time unit of the {@code batchDelay} - * @return - */ - public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) { - conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit); - return this; - } - - /** - * - * @return the maximum number of messages permitted in a batch. - */ - public int getBatchingMaxMessages() { - return conf.getBatchingMaxMessages(); - } - - /** - * Set the maximum number of messages permitted in a batch. default: 1000 If set to a value greater than 1, - * messages will be queued until this threshold is reached or batch interval has elapsed - * - * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as - * a single batch message. The consumer will be delivered individual messages in the batch in the same order - * they were enqueued - * @param batchMessagesMaxMessagesPerBatch - * maximum number of messages in a batch - * @return - */ - public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) { - conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch); - return this; - } - - public Optional getInitialSequenceId() { - return Optional.ofNullable(conf.getInitialSequenceId()); - } - - /** - * Set the baseline for the sequence ids for messages published by the producer. - *

- * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned - * incremental sequence ids, if not otherwise specified. - * - * @param initialSequenceId - * @return - */ - public ProducerConfiguration setInitialSequenceId(long initialSequenceId) { - conf.setInitialSequenceId(initialSequenceId); - return this; - } - - /** - * Set a name/value property with this producer. - * - * @param key - * @param value - * @return - */ - public ProducerConfiguration setProperty(String key, String value) { - checkArgument(key != null); - checkArgument(value != null); - conf.getProperties().put(key, value); - return this; - } - - /** - * Add all the properties in the provided map. - * - * @param properties - * @return - */ - public ProducerConfiguration setProperties(Map properties) { - conf.getProperties().putAll(properties); - return this; - } - - public Map getProperties() { - return conf.getProperties(); - } - - public ProducerConfigurationData getProducerConfigurationData() { - return conf; - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java deleted file mode 100644 index 8ac1dfb71e092..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.client.impl.v1.PulsarClientV1Impl; - -/** - * Class that provides a client interface to Pulsar. - *

- * Client instances are thread-safe and can be reused for managing multiple {@link Producer}, {@link Consumer} and - * {@link Reader} instances. - */ -public interface PulsarClient extends Closeable { - - /** - * Create a new PulsarClient object using default client configuration. - * - * @param serviceUrl - * the url of the Pulsar endpoint to be used - * @return a new pulsar client object - * @throws PulsarClientException.InvalidServiceURL - * if the serviceUrl is invalid - * @deprecated use {@link #builder()} to construct a client instance - */ - @Deprecated - static PulsarClient create(String serviceUrl) throws PulsarClientException { - return create(serviceUrl, new ClientConfiguration()); - } - - /** - * Create a new PulsarClient object. - * - * @param serviceUrl - * the url of the Pulsar endpoint to be used - * @param conf - * the client configuration - * @return a new pulsar client object - * @throws PulsarClientException.InvalidServiceURL - * if the serviceUrl is invalid - * @deprecated use {@link #builder()} to construct a client instance - */ - @Deprecated - static PulsarClient create(String serviceUrl, ClientConfiguration conf) throws PulsarClientException { - return new PulsarClientV1Impl(serviceUrl, conf); - } - - /** - * Create a producer with default {@link ProducerConfiguration} for publishing on a specific topic. - * - * @param topic - * The name of the topic where to produce - * @return The producer object - * @throws PulsarClientException.AlreadyClosedException - * if the client was already closed - * @throws PulsarClientException.InvalidTopicNameException - * if the topic name is not valid - * @throws PulsarClientException.AuthenticationException - * if there was an error with the supplied credentials - * @throws PulsarClientException.AuthorizationException - * if the authorization to publish on topic was denied - * @deprecated use {@link #newProducer()} to build a new producer - */ - @Deprecated - Producer createProducer(String topic) throws PulsarClientException; - - /** - * Asynchronously create a producer with default {@link ProducerConfiguration} for publishing on a specific topic. - * - * @param topic - * The name of the topic where to produce - * @return Future of the asynchronously created producer object - * @deprecated use {@link #newProducer()} to build a new producer - */ - @Deprecated - CompletableFuture createProducerAsync(String topic); - - /** - * Create a producer with given {@code ProducerConfiguration} for publishing on a specific topic. - * - * @param topic - * The name of the topic where to produce - * @param conf - * The {@code ProducerConfiguration} object - * @return The producer object - * @throws PulsarClientException - * if it was not possible to create the producer - * @throws InterruptedException - * @deprecated use {@link #newProducer()} to build a new producer - */ - @Deprecated - Producer createProducer(String topic, ProducerConfiguration conf) throws PulsarClientException; - - /** - * Asynchronously create a producer with given {@code ProducerConfiguration} for publishing on a specific topic. - * - * @param topic - * The name of the topic where to produce - * @param conf - * The {@code ProducerConfiguration} object - * @return Future of the asynchronously created producer object - * @deprecated use {@link #newProducer()} to build a new producer - */ - @Deprecated - CompletableFuture createProducerAsync(String topic, ProducerConfiguration conf); - - /** - * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}. - * - * @param topic - * The name of the topic - * @param subscription - * The name of the subscription - * @return The {@code Consumer} object - * @throws PulsarClientException - * @throws InterruptedException - * - * @deprecated Use {@link #newConsumer()} to build a new consumer - */ - @Deprecated - Consumer subscribe(String topic, String subscription) throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topic and subscription combination using default. - * {@code ConsumerConfiguration} - * - * @param topic - * The topic name - * @param subscription - * The subscription name - * @return Future of the {@code Consumer} object - * @deprecated Use {@link #newConsumer()} to build a new consumer - */ - @Deprecated - CompletableFuture subscribeAsync(String topic, String subscription); - - /** - * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}. - * - * @param topic - * The name of the topic - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return The {@code Consumer} object - * @throws PulsarClientException - * @deprecated Use {@link #newConsumer()} to build a new consumer - */ - @Deprecated - Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topic and subscription combination using given. - * {@code ConsumerConfiguration} - * - * @param topic - * The name of the topic - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return Future of the {@code Consumer} object - * @deprecated Use {@link #newConsumer()} to build a new consumer - */ - @Deprecated - CompletableFuture subscribeAsync(String topic, String subscription, ConsumerConfiguration conf); - - /** - * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic. - *

- * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a - * subscription. Reader can only work on non-partitioned topics. - *

- * The initial reader positioning is done by specifying a message id. The options are: - *

    - *
  • MessageId.earliest : Start reading from the earliest message available in the topic - *
  • MessageId.latest : Start reading from the end topic, only getting messages published after the - * reader was created - *
  • MessageId : When passing a particular message id, the reader will position itself on that - * specific position. The first message to be read will be the message next to the specified messageId. - *
- * - * @param topic - * The name of the topic where to read - * @param startMessageId - * The message id where the reader will position itself. The first message returned will be the one after - * the specified startMessageId - * @param conf - * The {@code ReaderConfiguration} object - * @return The {@code Reader} object - * @deprecated Use {@link #newReader()} to build a new reader - */ - @Deprecated - Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException; - - /** - * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the - * specified topic. - *

- * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a - * subscription. Reader can only work on non-partitioned topics. - *

- * The initial reader positioning is done by specifying a message id. The options are: - *

    - *
  • MessageId.earliest : Start reading from the earliest message available in the topic - *
  • MessageId.latest : Start reading from the end topic, only getting messages published after the - * reader was created - *
  • MessageId : When passing a particular message id, the reader will position itself on that - * specific position. The first message to be read will be the message next to the specified messageId. - *
- * - * @param topic - * The name of the topic where to read - * @param startMessageId - * The message id where the reader will position itself. The first message returned will be the one after - * the specified startMessageId - * @param conf - * The {@code ReaderConfiguration} object - * @return Future of the asynchronously created producer object - * @deprecated Use {@link #newReader()} to build a new reader - */ - @Deprecated - CompletableFuture createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf); - - /** - * Close the PulsarClient and release all the resources. - * - * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted. - * - * @throws PulsarClientException - * if the close operation fails - */ - @Override - void close() throws PulsarClientException; - - /** - * Asynchronously close the PulsarClient and release all the resources. - * - * All the producers and consumers will be orderly closed. Waits until all pending write request are persisted. - * - * @throws PulsarClientException - * if the close operation fails - */ - CompletableFuture closeAsync(); - - /** - * Perform immediate shutdown of PulsarClient. - * - * Release all the resources and close all the producers without waiting for ongoing operations to complete. - * - * @throws PulsarClientException - * if the forceful shutdown fails - */ - void shutdown() throws PulsarClientException; -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java deleted file mode 100644 index 98fcdb453bb76..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/Reader.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * A Reader can be used to scan through all the messages currently available in a topic. - * - */ -public interface Reader extends Closeable { - - /** - * @return the topic from which this reader is reading from - */ - String getTopic(); - - /** - * Read the next message in the topic. - * - * @return the next message - * @throws PulsarClientException - */ - Message readNext() throws PulsarClientException; - - /** - * Read the next message in the topic waiting for a maximum of timeout - * time units. Returns null if no message is received in that time. - * - * @return the next message(Could be null if none received in time) - * @throws PulsarClientException - */ - Message readNext(int timeout, TimeUnit unit) throws PulsarClientException; - - CompletableFuture> readNextAsync(); - - /** - * Asynchronously close the reader and stop the broker to push more messages. - * - * @return a future that can be used to track the completion of the operation - */ - CompletableFuture closeAsync(); - - /** - * Return true if the topic was terminated and this reader has reached the end of the topic. - */ - boolean hasReachedEndOfTopic(); - - /** - * Check if there is any message available to read from the current position. - */ - boolean hasMessageAvailable() throws PulsarClientException; - - /** - * Asynchronously Check if there is message that has been published successfully to the broker in the topic. - */ - CompletableFuture hasMessageAvailableAsync(); - - /** - * @return Whether the reader is connected to the broker - */ - boolean isConnected(); -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java deleted file mode 100644 index 885436a11336e..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.Serializable; -import java.util.Objects; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; -import org.apache.pulsar.client.impl.v1.ReaderV1Impl; - -/** - * - * @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance - */ -@Deprecated -public class ReaderConfiguration implements Serializable { - - private final ReaderConfigurationData conf = new ReaderConfigurationData<>(); - - private ReaderListener readerListener; - - /** - * @return the configured {@link ReaderListener} for the reader - */ - public ReaderListener getReaderListener() { - return readerListener; - } - - /** - * Sets a {@link ReaderListener} for the reader - *

- * When a {@link ReaderListener} is set, application will receive messages through it. Calls to - * {@link Reader#readNext()} will not be allowed. - * - * @param readerListener - * the listener object - */ - public ReaderConfiguration setReaderListener(ReaderListener readerListener) { - Objects.requireNonNull(readerListener); - this.readerListener = readerListener; - conf.setReaderListener(new org.apache.pulsar.shade.client.api.v2.ReaderListener() { - - @Override - public void received(org.apache.pulsar.shade.client.api.v2.Reader v2Reader, Message msg) { - readerListener.received(new ReaderV1Impl(v2Reader), msg); - } - - @Override - public void reachedEndOfTopic(org.apache.pulsar.shade.client.api.v2.Reader reader) { - readerListener.reachedEndOfTopic(new ReaderV1Impl(reader)); - } - }); - return this; - } - - /** - * @return the configure receiver queue size value - */ - public int getReceiverQueueSize() { - return conf.getReceiverQueueSize(); - } - - /** - * @return the CryptoKeyReader - */ - public CryptoKeyReader getCryptoKeyReader() { - return conf.getCryptoKeyReader(); - } - - /** - * Sets a {@link CryptoKeyReader}. - * - * @param cryptoKeyReader - * CryptoKeyReader object - */ - public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { - Objects.requireNonNull(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReader); - return this; - } - - /** - * Sets the ConsumerCryptoFailureAction to the value specified. - * - * @param action - * The action to take when the decoding fails - */ - public void setCryptoFailureAction(ConsumerCryptoFailureAction action) { - conf.setCryptoFailureAction(action); - } - - /** - * @return The ConsumerCryptoFailureAction - */ - public ConsumerCryptoFailureAction getCryptoFailureAction() { - return conf.getCryptoFailureAction(); - } - - /** - * Sets the size of the consumer receive queue. - *

- * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the - * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer - * throughput at the expense of bigger memory utilization. - *

- * Default value is {@code 1000} messages and should be good for most use cases. - * - * @param receiverQueueSize - * the new receiver queue size value - */ - public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) { - checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative"); - conf.setReceiverQueueSize(receiverQueueSize); - return this; - } - - /** - * @return the consumer name - */ - public String getReaderName() { - return conf.getReaderName(); - } - - /** - * Set the consumer name. - * - * @param readerName - */ - public ReaderConfiguration setReaderName(String readerName) { - checkArgument(StringUtils.isNotBlank(readerName)); - conf.setReaderName(readerName); - return this; - } - - /** - * @return the subscription role prefix for subscription auth - */ - public String getSubscriptionRolePrefix() { - return conf.getSubscriptionRolePrefix(); - } - - /** - * Set the subscription role prefix for subscription auth. The default prefix is "reader". - * - * @param subscriptionRolePrefix - */ - public ReaderConfiguration setSubscriptionRolePrefix(String subscriptionRolePrefix) { - checkArgument(StringUtils.isNotBlank(subscriptionRolePrefix)); - conf.setSubscriptionRolePrefix(subscriptionRolePrefix); - return this; - } - - public ReaderConfigurationData getReaderConfigurationData() { - return conf; - } - - private static final long serialVersionUID = 1L; -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java deleted file mode 100644 index 26d694d589ac0..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/ReaderListener.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.api; - -import java.io.Serializable; - -/** - * A listener that will be called in order for every message received. - */ -public interface ReaderListener extends Serializable { - /** - * This method is called whenever a new message is received. - * - * Messages are guaranteed to be delivered in order and from the same thread for a single consumer - * - * This method will only be called once for each message, unless either application or broker crashes. - * - * Application is responsible of handling any exception that could be thrown while processing the message. - * - * @param reader - * the Reader object from where the message was received - * @param msg - * the message object - */ - void received(Reader reader, Message msg); - - /** - * Get the notification when a topic is terminated. - * - * @param reader - * the Reader object associated with the terminated topic - */ - default void reachedEndOfTopic(Reader reader) { - // By default ignore the notification - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java deleted file mode 100644 index 57896b5e27235..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/api/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Pulsar Client API. - */ -package org.apache.pulsar.client.api; diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java deleted file mode 100644 index 6d6a08725d7da..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import static com.google.common.base.Preconditions.checkArgument; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageBuilder; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.api.proto.MessageMetadata; - -@SuppressWarnings("deprecation") -public class MessageBuilderImpl implements MessageBuilder { - private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0); - private final MessageMetadata msgMetadataBuilder = new MessageMetadata(); - private ByteBuffer content = EMPTY_CONTENT; - - @Override - public Message build() { - return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES, null); - } - - @Override - public MessageBuilder setContent(byte[] data) { - setContent(data, 0, data.length); - return this; - } - - @Override - public MessageBuilder setContent(byte[] data, int offet, int length) { - this.content = ByteBuffer.wrap(data, offet, length); - return this; - } - - @Override - public MessageBuilder setContent(ByteBuffer buf) { - this.content = buf.duplicate(); - return this; - } - - @Override - public MessageBuilder setProperties(Map properties) { - for (Map.Entry entry : properties.entrySet()) { - msgMetadataBuilder.addProperty() - .setKey(entry.getKey()) - .setValue(entry.getValue()); - } - - return this; - } - - @Override - public MessageBuilder setProperty(String name, String value) { - msgMetadataBuilder.addProperty() - .setKey(name) - .setValue(value); - return this; - } - - @Override - public MessageBuilder setKey(String key) { - msgMetadataBuilder.setPartitionKey(key); - return this; - } - - @Override - public MessageBuilder setEventTime(long timestamp) { - checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); - msgMetadataBuilder.setEventTime(timestamp); - return this; - } - - @Override - public MessageBuilder setSequenceId(long sequenceId) { - checkArgument(sequenceId >= 0); - msgMetadataBuilder.setSequenceId(sequenceId); - return this; - } - - @Override - public MessageBuilder setReplicationClusters(List clusters) { - Objects.requireNonNull(clusters); - msgMetadataBuilder.clearReplicateTo(); - msgMetadataBuilder.addAllReplicateTos(clusters); - return this; - } - - @Override - public MessageBuilder disableReplication() { - msgMetadataBuilder.clearReplicateTo(); - msgMetadataBuilder.addReplicateTo("__local__"); - return this; - } - - -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java deleted file mode 100644 index e429b403ec6b3..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Pulsar Client API. - */ -package org.apache.pulsar.client.impl; diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java deleted file mode 100644 index ab17beee4f7d2..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ConsumerV1Impl.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.v1; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerStats; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; - -public class ConsumerV1Impl implements Consumer { - private final org.apache.pulsar.shade.client.api.v2.Consumer consumer; - - public ConsumerV1Impl(org.apache.pulsar.shade.client.api.v2.Consumer consumer) { - this.consumer = consumer; - } - - @Override - public void acknowledge(Message arg0) throws PulsarClientException { - consumer.acknowledge(arg0); - } - - @Override - public void acknowledge(MessageId arg0) throws PulsarClientException { - consumer.acknowledge(arg0); - } - - @Override - public CompletableFuture acknowledgeAsync(Message arg0) { - return consumer.acknowledgeAsync(arg0); - } - - @Override - public CompletableFuture acknowledgeAsync(MessageId arg0) { - return consumer.acknowledgeAsync(arg0); - } - - @Override - public void acknowledgeCumulative(Message arg0) throws PulsarClientException { - consumer.acknowledgeCumulative(arg0); - } - - @Override - public void acknowledgeCumulative(MessageId arg0) throws PulsarClientException { - consumer.acknowledgeCumulative(arg0); - } - - @Override - public CompletableFuture acknowledgeCumulativeAsync(Message arg0) { - return consumer.acknowledgeCumulativeAsync(arg0); - } - - @Override - public CompletableFuture acknowledgeCumulativeAsync(MessageId arg0) { - return consumer.acknowledgeCumulativeAsync(arg0); - } - - @Override - public void close() throws PulsarClientException { - consumer.close(); - } - - @Override - public CompletableFuture closeAsync() { - return consumer.closeAsync(); - } - - @Override - public String getConsumerName() { - return consumer.getConsumerName(); - } - - @Override - public ConsumerStats getStats() { - return consumer.getStats(); - } - - public String getSubscription() { - return consumer.getSubscription(); - } - - public String getTopic() { - return consumer.getTopic(); - } - - public boolean hasReachedEndOfTopic() { - return consumer.hasReachedEndOfTopic(); - } - - public boolean isConnected() { - return consumer.isConnected(); - } - - public void pause() { - consumer.pause(); - } - - public Message receive() throws PulsarClientException { - return consumer.receive(); - } - - public Message receive(int arg0, TimeUnit arg1) throws PulsarClientException { - return consumer.receive(arg0, arg1); - } - - public CompletableFuture> receiveAsync() { - return consumer.receiveAsync(); - } - - public void redeliverUnacknowledgedMessages() { - consumer.redeliverUnacknowledgedMessages(); - } - - public void resume() { - consumer.resume(); - } - - public void seek(MessageId arg0) throws PulsarClientException { - consumer.seek(arg0); - } - - public void seek(long arg0) throws PulsarClientException { - consumer.seek(arg0); - } - - public void seek(Function function) throws PulsarClientException { - consumer.seek(function); - } - - public CompletableFuture seekAsync(long arg0) { - return consumer.seekAsync(arg0); - } - - public CompletableFuture seekAsync(MessageId arg0) { - return consumer.seekAsync(arg0); - } - - public CompletableFuture seekAsync(Function function) { - return consumer.seekAsync(function); - } - - public void unsubscribe() throws PulsarClientException { - consumer.unsubscribe(); - } - - public CompletableFuture unsubscribeAsync() { - return consumer.unsubscribeAsync(); - } - - public MessageId getLastMessageId() throws PulsarClientException { - return consumer.getLastMessageId(); - } - - public CompletableFuture getLastMessageIdAsync() { - return consumer.getLastMessageIdAsync(); - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java deleted file mode 100644 index 12c8d0f1527ce..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ProducerV1Impl.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.v1; - -import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerStats; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.ProducerImpl; - -public class ProducerV1Impl implements Producer { - - private final ProducerImpl producer; - - public ProducerV1Impl(ProducerImpl producer) { - this.producer = producer; - } - - public void close() throws PulsarClientException { - producer.close(); - } - - public CompletableFuture closeAsync() { - return producer.closeAsync(); - } - - public void flush() throws PulsarClientException { - producer.flush(); - } - - public CompletableFuture flushAsync() { - return producer.flushAsync(); - } - - public long getLastSequenceId() { - return producer.getLastSequenceId(); - } - - public ProducerStats getStats() { - return producer.getStats(); - } - - public boolean isConnected() { - return producer.isConnected(); - } - - public MessageId send(byte[] value) throws PulsarClientException { - return producer.send(value); - } - - public MessageId send(Message value) throws PulsarClientException { - return producer.send(value); - } - - public CompletableFuture sendAsync(byte[] arg0) { - return producer.sendAsync(arg0); - } - - public CompletableFuture sendAsync(Message arg0) { - return producer.sendAsync(arg0); - } - - @Override - public String getTopic() { - return producer.getTopic(); - } - - @Override - public String getProducerName() { - return producer.getProducerName(); - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java deleted file mode 100644 index ca4373d37f4c1..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/PulsarClientV1Impl.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.v1; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import org.apache.pulsar.client.api.ClientConfiguration; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderConfiguration; -import org.apache.pulsar.client.impl.ProducerImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; -import org.apache.pulsar.common.util.FutureUtil; - -@SuppressWarnings("deprecation") -public class PulsarClientV1Impl implements PulsarClient { - - private final PulsarClientImpl client; - - public PulsarClientV1Impl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException { - this.client = new PulsarClientImpl(conf.setServiceUrl(serviceUrl).getConfigurationData().clone()); - } - - @Override - public void close() throws PulsarClientException { - client.close(); - } - - @Override - public CompletableFuture closeAsync() { - return client.closeAsync(); - } - - @Override - public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException { - if (conf == null) { - throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object"); - } - - try { - return createProducerAsync(topic, conf).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - public Producer createProducer(String topic) - throws PulsarClientException { - return createProducer(topic, new ProducerConfiguration()); - } - - @Override - public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf) { - ProducerConfigurationData confData = conf.getProducerConfigurationData().clone(); - confData.setTopicName(topic); - return client.createProducerAsync(confData).thenApply(p -> new ProducerV1Impl((ProducerImpl) p)); - } - - @Override - public CompletableFuture createProducerAsync(String topic) { - return createProducerAsync(topic, new ProducerConfiguration()); - } - - @Override - public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) - throws PulsarClientException { - try { - return createReaderAsync(topic, startMessageId, conf).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - public CompletableFuture createReaderAsync(String topic, MessageId startMessageId, - ReaderConfiguration conf) { - ReaderConfigurationData confData = conf.getReaderConfigurationData().clone(); - confData.setTopicName(topic); - confData.setStartMessageId(startMessageId); - return client.createReaderAsync(confData).thenApply(r -> new ReaderV1Impl(r)); - } - - @Override - public void shutdown() throws PulsarClientException { - client.shutdown(); - } - - @Override - public Consumer subscribe(String topic, String subscriptionName) throws PulsarClientException { - return subscribe(topic, subscriptionName, new ConsumerConfiguration()); - } - - @Override - public CompletableFuture subscribeAsync(final String topic, final String subscription, - final ConsumerConfiguration conf) { - if (conf == null) { - return FutureUtil.failedFuture( - new PulsarClientException.InvalidConfigurationException("Invalid null configuration")); - } - - ConsumerConfigurationData confData = conf.getConfigurationData().clone(); - confData.getTopicNames().add(topic); - confData.setSubscriptionName(subscription); - return client.subscribeAsync(confData).thenApply(c -> new ConsumerV1Impl(c)); - } - - @Override - public CompletableFuture subscribeAsync(String topic, - String subscriptionName) { - return subscribeAsync(topic, subscriptionName, new ConsumerConfiguration()); - } - - @Override - public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) - throws PulsarClientException { - try { - return subscribeAsync(topic, subscription, conf).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java deleted file mode 100644 index 2e6384459e160..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/ReaderV1Impl.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.v1; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; - -public class ReaderV1Impl implements Reader { - - private final org.apache.pulsar.shade.client.api.v2.Reader reader; - - public ReaderV1Impl(org.apache.pulsar.shade.client.api.v2.Reader reader) { - this.reader = reader; - } - - @Override - public void close() throws IOException { - reader.close(); - } - - @Override - public CompletableFuture closeAsync() { - return reader.closeAsync(); - } - - @Override - public String getTopic() { - return reader.getTopic(); - } - - @Override - public boolean hasMessageAvailable() throws PulsarClientException { - return reader.hasMessageAvailable(); - } - - @Override - public CompletableFuture hasMessageAvailableAsync() { - return reader.hasMessageAvailableAsync(); - } - - @Override - public boolean hasReachedEndOfTopic() { - return reader.hasReachedEndOfTopic(); - } - - @Override - public boolean isConnected() { - return reader.isConnected(); - } - - @Override - public Message readNext() throws PulsarClientException { - return reader.readNext(); - } - - @Override - public Message readNext(int arg0, TimeUnit arg1) throws PulsarClientException { - return reader.readNext(arg0, arg1); - } - - @Override - public CompletableFuture> readNextAsync() { - return reader.readNextAsync(); - } -} diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java deleted file mode 100644 index 97705e7d567ca..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/v1/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Pulsar Client API. - */ -package org.apache.pulsar.client.impl.v1; diff --git a/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml b/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml deleted file mode 100644 index 7938e60bf4330..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-1x/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml deleted file mode 100644 index 2e316e8e5eee3..0000000000000 --- a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml +++ /dev/null @@ -1,97 +0,0 @@ - - - 4.0.0 - - - org.apache.pulsar - pulsar-client-1x-base - 4.0.0-SNAPSHOT - - - pulsar-client-2x-shaded - Pulsar Client 2.x Shaded API - - - - ${project.groupId} - pulsar-client - ${project.version} - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - ${shadePluginPhase} - - shade - - - true - true - false - - - - org.apache.pulsar:pulsar-client - org.apache.pulsar:pulsar-client-api - - - - - org.apache.pulsar:pulsar-client - - ** - - - - org/bouncycastle/** - - - - - - org.apache.pulsar.client.api - org.apache.pulsar.shade.client.api.v2 - - org.apache.pulsar.client.api.PulsarClient - org.apache.pulsar.client.api.Producer - org.apache.pulsar.client.api.Consumer - org.apache.pulsar.client.api.Reader - org.apache.pulsar.client.api.MessageListener - org.apache.pulsar.client.api.ReaderListener - - - - - - - - - -