Sample Code: *
{@code * // Construct a new connection string - * ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder() - * .setNamespaceName("EventHubsNamespaceName") - * .setEventHubName("EventHubsEntityName") - * .setSasKeyName("SharedAccessSignatureKeyName") - * .setSasKey("SharedAccessSignatureKey") + * ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder() + * .setNamespaceName("EventHubsNamespaceName") + * .setEventHubName("EventHubsEntityName") + * .setSasKeyName("SharedAccessSignatureKeyName") + * .setSasKey("SharedAccessSignatureKey") * * string connString = connectionStringBuilder.build(); * @@ -47,20 +47,20 @@ */ public final class ConnectionStringBuilder { - final static String endpointFormat = "sb://%s.%s"; - final static String hostnameFormat = "sb://%s"; - final static String defaultDomainName = "servicebus.windows.net"; - - final static String HostnameConfigName = "Hostname"; // Hostname is a key that is used in IoTHub. - final static String EndpointConfigName = "Endpoint"; // Endpoint key is used in EventHubs. It's identical to Hostname in IoTHub. - final static String EntityPathConfigName = "EntityPath"; - final static String OperationTimeoutConfigName = "OperationTimeout"; - final static String KeyValueSeparator = "="; - final static String KeyValuePairDelimiter = ";"; - final static String SharedAccessKeyNameConfigName = "SharedAccessKeyName"; // We use a (KeyName, Key) pair OR the SAS token - never both. - final static String SharedAccessKeyConfigName = "SharedAccessKey"; - final static String SharedAccessSignatureConfigName = "SharedAccessSignature"; - final static String TransportTypeConfigName = "TransportType"; + static final String endpointFormat = "sb://%s.%s"; + static final String hostnameFormat = "sb://%s"; + static final String defaultDomainName = "servicebus.windows.net"; + + static final String HostnameConfigName = "Hostname"; // Hostname is a key that is used in IoTHub. + static final String EndpointConfigName = "Endpoint"; // Endpoint key is used in EventHubs. It's identical to Hostname in IoTHub. + static final String EntityPathConfigName = "EntityPath"; + static final String OperationTimeoutConfigName = "OperationTimeout"; + static final String KeyValueSeparator = "="; + static final String KeyValuePairDelimiter = ";"; + static final String SharedAccessKeyNameConfigName = "SharedAccessKeyName"; // We use a (KeyName, Key) pair OR the SAS token - never both. + static final String SharedAccessKeyConfigName = "SharedAccessKey"; + static final String SharedAccessSignatureConfigName = "SharedAccessSignature"; + static final String TransportTypeConfigName = "TransportType"; private static final String AllKeyEnumerateRegex = "(" + HostnameConfigName + "|" + EndpointConfigName + "|" + SharedAccessKeyNameConfigName + "|" + SharedAccessKeyConfigName + "|" + SharedAccessSignatureConfigName + "|" + EntityPathConfigName + "|" + OperationTimeoutConfigName diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java index f2f4b41472a4b..b1f4f364b39aa 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java @@ -10,8 +10,10 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.time.Instant; -import java.util.*; -import java.util.concurrent.ScheduledExecutorService; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; /** * The data structure encapsulating the Event being sent-to and received-from EventHubs. @@ -35,8 +37,8 @@ public interface EventData extends Serializable, Comparable{ * Construct EventData to Send to EventHubs. * Typical pattern to create a Sending EventData is: * - * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. - * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. + * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. + * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. **Sample Code: *
@@ -57,8 +59,8 @@ static EventData create(final byte[] data) { * Construct EventData to Send to EventHubs. * Typical pattern to create a Sending EventData is: *- * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. - * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. + * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. + * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. **Illustration: *
{@code @@ -81,14 +83,14 @@ static EventData create(final byte[] data, final int offset, final int length) { * Construct EventData to Send to EventHubs. * Typical pattern to create a Sending EventData is: *- * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. - * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. + * i. Serialize the sending ApplicationEvent to be sent to EventHubs into bytes. + * ii. If complex serialization logic is involved (for example: multiple types of data) - add a Hint using the {@link #getProperties()} for the Consumer. **Illustration: *
{@code * EventData eventData = EventData.create(telemetryEventByteBuffer); * eventData.getProperties().put("eventType", "com.microsoft.azure.monitoring.EtlEvent"); - * partitionSender.Send(eventData); + * partitionSender.Send(eventData); * }* * @param buffer ByteBuffer which references the payload of the Event to be sent to EventHubs diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 86fea27195580..0e943f0bcbb9a 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -9,7 +9,6 @@ import java.io.IOException; import java.nio.channels.UnresolvedAddressException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; /** @@ -126,9 +125,9 @@ default void sendSync(final EventData data) throws EventHubException { * Send {@link EventData} to EventHub. The sent {@link EventData} will land on any arbitrarily chosen EventHubs partition. *There are 3 ways to send to EventHubs, each exposed as a method (along with its sendBatch overload): *
- *
*- {@link #send(EventData)}, {@link #send(Iterable)}, or {@link #send(EventDataBatch)} - *
- {@link #send(EventData, String)} or {@link #send(Iterable, String)} - *
- {@link PartitionSender#send(EventData)}, {@link PartitionSender#send(Iterable)}, or {@link PartitionSender#send(EventDataBatch)} + *
- {@link #send(EventData)}, {@link #send(Iterable)}, or {@link #send(EventDataBatch)} + *
- {@link #send(EventData, String)} or {@link #send(Iterable, String)} + *
- {@link PartitionSender#send(EventData)}, {@link PartitionSender#send(Iterable)}, or {@link PartitionSender#send(EventDataBatch)} *
Use this method to Send, if: *
@@ -168,8 +167,8 @@ default void sendSync(final IterableeventDatas) throws EventHubExcep * Use this overload versus {@link #send(EventData)}, if you need to send a batch of {@link EventData}. * Sending a batch of {@link EventData}'s is useful in the following cases: *
- * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. - * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. + * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. + * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. *** Sample code (sample uses sync version of the api but concept are identical): @@ -283,8 +282,8 @@ default void sendSync(final Iterable
eventDatas, final String partiti * There are 3 ways to send to EventHubs, to understand this particular type of Send refer to the overload {@link #send(EventData, String)}, which is the same type of Send and is used to send single {@link EventData}. *
Sending a batch of {@link EventData}'s is useful in the following cases: *
- * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs service. - * ii. Send multiple events in One Transaction. This is the reason why all events sent in a batch needs to have same partitionKey (so that they are sent to one partition only). + * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs service. + * ii. Send multiple events in One Transaction. This is the reason why all events sent in a batch needs to have same partitionKey (so that they are sent to one partition only). ** * @param eventDatas the batch of events to send to EventHub diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java index a972e30c05727..e51c203fa868b 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java @@ -33,4 +33,4 @@ public interface PartitionReceiveHandler { * @param error fatal error encountered while running the {@link PartitionReceiveHandler} pump */ void onError(final Throwable error); -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index 542f5d1fcfdc0..e724e49f17d9e 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -140,4 +140,4 @@ default IterablereceiveSync(final int maxEventCount) throws EventHub CompletableFuture close(); void closeSync() throws EventHubException; -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java index 578fe8292932f..fc93495744709 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionSender.java @@ -6,7 +6,6 @@ import com.microsoft.azure.eventhubs.impl.ExceptionUtil; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; /** * This sender class is a logical representation of sending events to a specific EventHub partition. Do not use this class @@ -93,8 +92,8 @@ default void sendSync(final Iterable eventDatas) throws EventHubExcep * * Sending a batch of {@link EventData}'s is useful in the following cases: *
- * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. - * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. + * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. + * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. *** Sample code (sample uses sync version of the api but concept are identical): @@ -143,8 +142,8 @@ default void sendSync(final EventDataBatch eventDatas) throws EventHubException *
* Sending a batch of {@link EventData}'s is useful in the following cases: *
- * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. - * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. + * i. Efficient send - sending a batch of {@link EventData} maximizes the overall throughput by optimally using the number of sessions created to EventHubs' service. + * ii. Send multiple {@link EventData}'s in a Transaction. To achieve ACID properties, the Gateway Service will forward all {@link EventData}'s in the batch to a single EventHubs' partition. ** * @param eventDatas EventDataBatch to send to EventHub diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java index 4302817c0c2a5..315927bf8e672 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -22,8 +22,8 @@ public ReceiverOptions() { private static void validateReceiverIdentifier(final String receiverName) { - if (receiverName != null && - receiverName.length() > ClientConstants.MAX_RECEIVER_NAME_LENGTH) { + if (receiverName != null + && receiverName.length() > ClientConstants.MAX_RECEIVER_NAME_LENGTH) { throw new IllegalArgumentException("receiverIdentifier length cannot exceed 64"); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/RetryPolicy.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/RetryPolicy.java index d3b04b49ac32d..b5d9f02463c68 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/RetryPolicy.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/RetryPolicy.java @@ -76,8 +76,8 @@ protected int getRetryCount(String clientId) { public Duration getNextRetryInterval(String clientId, Exception lastException, Duration remainingTime) { int baseWaitTime = 0; synchronized (this.serverBusySync) { - if (lastException != null && - (lastException instanceof ServerBusyException || (lastException.getCause() != null && lastException.getCause() instanceof ServerBusyException))) { + if (lastException != null + && (lastException instanceof ServerBusyException || (lastException.getCause() != null && lastException.getCause() instanceof ServerBusyException))) { baseWaitTime += ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS; } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/TransportType.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/TransportType.java index f681277296de7..ac95d9ca9739f 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/TransportType.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/TransportType.java @@ -37,4 +37,4 @@ static TransportType fromString(final String value) { throw new IllegalArgumentException(); } -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java index acc30c3995621..eac5f39c9caa1 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java @@ -48,20 +48,20 @@ public final class AmqpConstants { public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id"; @SuppressWarnings("serial") public static final SetRESERVED_PROPERTY_NAMES = Collections.unmodifiableSet(new HashSet () {{ - add(AMQP_PROPERTY_MESSAGE_ID); - add(AMQP_PROPERTY_USER_ID); - add(AMQP_PROPERTY_TO); - add(AMQP_PROPERTY_SUBJECT); - add(AMQP_PROPERTY_REPLY_TO); - add(AMQP_PROPERTY_CORRELATION_ID); - add(AMQP_PROPERTY_CONTENT_TYPE); - add(AMQP_PROPERTY_CONTENT_ENCODING); - add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME); - add(AMQP_PROPERTY_CREATION_TIME); - add(AMQP_PROPERTY_GROUP_ID); - add(AMQP_PROPERTY_GROUP_SEQUENCE); - add(AMQP_PROPERTY_REPLY_TO_GROUP_ID); - }}); + add(AMQP_PROPERTY_MESSAGE_ID); + add(AMQP_PROPERTY_USER_ID); + add(AMQP_PROPERTY_TO); + add(AMQP_PROPERTY_SUBJECT); + add(AMQP_PROPERTY_REPLY_TO); + add(AMQP_PROPERTY_CORRELATION_ID); + add(AMQP_PROPERTY_CONTENT_TYPE); + add(AMQP_PROPERTY_CONTENT_ENCODING); + add(AMQP_PROPERTY_ABSOLUTE_EXPRITY_TIME); + add(AMQP_PROPERTY_CREATION_TIME); + add(AMQP_PROPERTY_GROUP_ID); + add(AMQP_PROPERTY_GROUP_SEQUENCE); + add(AMQP_PROPERTY_REPLY_TO_GROUP_ID); + }}); public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric"); public static final Symbol RECEIVER_IDENTIFIER_NAME = Symbol.valueOf(AmqpConstants.VENDOR + ":receiver-name"); private AmqpConstants() { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java index 6ef7d610b5216..991c3b1cf9b20 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java @@ -4,7 +4,11 @@ package com.microsoft.azure.eventhubs.impl; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index 31a9e4b0dde21..9d140815aef10 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -8,35 +8,35 @@ import java.time.Duration; public final class ClientConstants { - public final static int AMQPS_PORT = 5671; - public final static int HTTPS_PORT = 443; - public final static int MAX_PARTITION_KEY_LENGTH = 128; - public final static Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy"); - public final static Symbol ARGUMENT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-error"); - public final static Symbol ARGUMENT_OUT_OF_RANGE_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-out-of-range"); - public final static Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled"); - public final static Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned"); - public final static Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); - public final static Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); - public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); - public final static Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); + public static final int AMQPS_PORT = 5671; + public static final int HTTPS_PORT = 443; + public static final int MAX_PARTITION_KEY_LENGTH = 128; + public static final Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy"); + public static final Symbol ARGUMENT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-error"); + public static final Symbol ARGUMENT_OUT_OF_RANGE_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-out-of-range"); + public static final Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled"); + public static final Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned"); + public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); + public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); + public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024; public static final int MAX_EVENTHUB_AMQP_HEADER_SIZE_BYTES = 512; - public final static Duration TIMER_TOLERANCE = Duration.ofSeconds(1); - public final static Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0); - public final static Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30); - public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins - public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20); - public final static int DEFAULT_MAX_RETRY_COUNT = 10; - public final static boolean DEFAULT_IS_TRANSIENT = true; - public final static int REACTOR_IO_POLL_TIMEOUT = 20; - public final static int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4; - public final static int MGMT_CHANNEL_MIN_RETRY_IN_MILLIS = 5; - public final static String NO_RETRY = "NoRetry"; - public final static String DEFAULT_RETRY = "Default"; - public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = "2.0.0"; + public static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1); + public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0); + public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30); + public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins + public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20); + public static final int DEFAULT_MAX_RETRY_COUNT = 10; + public static final boolean DEFAULT_IS_TRANSIENT = true; + public static final int REACTOR_IO_POLL_TIMEOUT = 20; + public static final int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4; + public static final int MGMT_CHANNEL_MIN_RETRY_IN_MILLIS = 5; + public static final String NO_RETRY = "NoRetry"; + public static final String DEFAULT_RETRY = "Default"; + public static final String PRODUCT_NAME = "MSJavaClient"; + public static final String CURRENT_JAVACLIENT_VERSION = "2.0.0"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); public static final String CBS_ADDRESS = "$cbs"; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index d5aab0d41e946..f3f98809160bd 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -7,7 +7,12 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.SslDomain; +import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.TransportInternal; import org.apache.qpid.proton.reactor.Handshaker; import org.slf4j.Logger; @@ -82,9 +87,9 @@ public void onConnectionInit(Event event) { final String userAgent = EventHubClientImpl.USER_AGENT; if (userAgent != null) { - connectionProperties.put(AmqpConstants.USER_AGENT, userAgent.length() < AmqpConstants.MAX_USER_AGENT_LENGTH ? - userAgent : - userAgent.substring(0, AmqpConstants.MAX_USER_AGENT_LENGTH)); + connectionProperties.put(AmqpConstants.USER_AGENT, userAgent.length() < AmqpConstants.MAX_USER_AGENT_LENGTH + ? userAgent + : userAgent.substring(0, AmqpConstants.MAX_USER_AGENT_LENGTH)); } connection.setProperties(connectionProperties); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java index 00eca81c66f1e..93be1ce8b2d60 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java @@ -7,7 +7,12 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.*; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.Message; import java.io.IOException; @@ -22,8 +27,8 @@ public final class EventDataImpl implements EventData { private static final long serialVersionUID = -5631628195600014255L; private static final int BODY_DATA_NULL = -1; - transient private Binary bodyData; - transient private Object amqpBody; + private transient Binary bodyData; + private transient Object amqpBody; private Map properties; private SystemProperties systemProperties; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java index 35263f2c1542b..dba9d1e999745 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java @@ -7,7 +7,11 @@ import com.microsoft.azure.eventhubs.EventPosition; import org.apache.qpid.proton.message.Message; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Set; import java.util.function.Consumer; /* @@ -17,12 +21,12 @@ final class EventDataUtil { @SuppressWarnings("serial") static final Set RESERVED_SYSTEM_PROPERTIES = Collections.unmodifiableSet(new HashSet () {{ - add(AmqpConstants.OFFSET_ANNOTATION_NAME); - add(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME); - add(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME); - add(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME); - add(AmqpConstants.PUBLISHER_ANNOTATION_NAME); - }}); + add(AmqpConstants.OFFSET_ANNOTATION_NAME); + add(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME); + add(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME); + add(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME); + add(AmqpConstants.PUBLISHER_ANNOTATION_NAME); + }}); private EventDataUtil() { } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 453f800b7849b..d8bcf45a4065c 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -3,7 +3,20 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.BatchOptions; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventDataBatch; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; +import com.microsoft.azure.eventhubs.EventPosition; +import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; +import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.eventhubs.ReceiverOptions; +import com.microsoft.azure.eventhubs.RetryPolicy; import java.io.IOException; import java.security.InvalidKeyException; @@ -44,7 +57,7 @@ private EventHubClientImpl(final ConnectionStringBuilder connectionString, final } public static CompletableFuture create( - final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) + final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) throws EventHubException, IOException { final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr, executor); @@ -67,20 +80,20 @@ public String getEventHubName() { public final EventDataBatch createBatch(BatchOptions options) throws EventHubException { return ExceptionUtil.sync(() -> { - int maxSize = this.createInternalSender().thenApplyAsync( - (aVoid) -> this.sender.getMaxMessageSize(), - this.executor).get(); - if (options.maxMessageSize == null) { - return new EventDataBatchImpl(maxSize, options.partitionKey); - } - - if (options.maxMessageSize > maxSize) { - throw new IllegalArgumentException("The maxMessageSize set in BatchOptions is too large. You set a maxMessageSize of " + - options.maxMessageSize + ". The maximum allowed size is " + maxSize + "."); - } + int maxSize = this.createInternalSender().thenApplyAsync( + (aVoid) -> this.sender.getMaxMessageSize(), + this.executor).get(); + if (options.maxMessageSize == null) { + return new EventDataBatchImpl(maxSize, options.partitionKey); + } - return new EventDataBatchImpl(options.maxMessageSize, options.partitionKey); + if (options.maxMessageSize > maxSize) { + throw new IllegalArgumentException("The maxMessageSize set in BatchOptions is too large. You set a maxMessageSize of " + + options.maxMessageSize + ". The maximum allowed size is " + maxSize + "."); } + + return new EventDataBatchImpl(options.maxMessageSize, options.partitionKey); + } ); } @@ -119,9 +132,9 @@ public final CompletableFuture send(final EventDataBatch eventDatas) { } final EventDataBatchImpl eventDataBatch = (EventDataBatchImpl) eventDatas; - return eventDataBatch.getPartitionKey() != null ? - this.send(eventDataBatch.getInternalIterable(), eventDataBatch.getPartitionKey()) : - this.send(eventDataBatch.getInternalIterable()); + return eventDataBatch.getPartitionKey() != null + ? this.send(eventDataBatch.getInternalIterable(), eventDataBatch.getPartitionKey()) + : this.send(eventDataBatch.getInternalIterable()); } @Override @@ -201,11 +214,11 @@ public CompletableFuture onClose() { synchronized (this.senderCreateSync) { final CompletableFuture internalSenderClose = this.sender != null ? this.sender.close().thenComposeAsync(new Function >() { - @Override - public CompletableFuture apply(Void voidArg) { - return EventHubClientImpl.this.underlyingFactory.close(); - } - }, this.executor) + @Override + public CompletableFuture apply(Void voidArg) { + return EventHubClientImpl.this.underlyingFactory.close(); + } + }, this.executor) : this.underlyingFactory.close(); return internalSenderClose; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java index 26f126c34da9b..af8a99b26d793 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java @@ -72,15 +72,15 @@ public boolean getInclusiveFlag() { String getExpression() { // order of preference if (this.offset != null) { - return this.inclusiveFlag ? - String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.OFFSET_ANNOTATION_NAME, "=", this.offset) : - String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.OFFSET_ANNOTATION_NAME, StringUtil.EMPTY, this.offset); + return this.inclusiveFlag + ? String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.OFFSET_ANNOTATION_NAME, "=", this.offset) + : String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.OFFSET_ANNOTATION_NAME, StringUtil.EMPTY, this.offset); } if (this.sequenceNumber != null) { - return this.inclusiveFlag ? - String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, "=", this.sequenceNumber) : - String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, StringUtil.EMPTY, this.sequenceNumber); + return this.inclusiveFlag + ? String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, "=", this.sequenceNumber) + : String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, StringUtil.EMPTY, this.sequenceNumber); } if (this.dateTime != null) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java index ef091bb755469..66b7617e5cbf2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java @@ -3,7 +3,15 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.AuthorizationFailedException; +import com.microsoft.azure.eventhubs.ErrorContext; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.IllegalEntityException; +import com.microsoft.azure.eventhubs.PayloadSizeExceededException; +import com.microsoft.azure.eventhubs.QuotaExceededException; +import com.microsoft.azure.eventhubs.ReceiverDisconnectedException; +import com.microsoft.azure.eventhubs.ServerBusyException; +import com.microsoft.azure.eventhubs.TimeoutException; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -27,7 +35,7 @@ static Exception toException(ErrorCondition errorCondition) { } else if (errorCondition.getCondition() == ClientConstants.SERVER_BUSY_ERROR) { return new ServerBusyException(errorCondition.getDescription()); } else if (errorCondition.getCondition() == AmqpErrorCode.NotFound) { - return ExceptionUtil.distinguishNotFound(errorCondition.getDescription()); + return ExceptionUtil.distinguishNotFound(errorCondition.getDescription()); } else if (errorCondition.getCondition() == ClientConstants.ENTITY_DISABLED_ERROR) { return new IllegalEntityException(errorCondition.getDescription()); } else if (errorCondition.getCondition() == AmqpErrorCode.Stolen) { @@ -77,15 +85,15 @@ static Exception amqpResponseCodeToException(final int statusCode, final String return new EventHubException(true, String.format(ClientConstants.AMQP_REQUEST_FAILED_ERROR, statusCode, statusDescription)); } } - + static Exception distinguishNotFound(final String message) { Pattern p = Pattern.compile("The messaging entity .* could not be found"); Matcher m = p.matcher(message); - if (m.find()) { - return new IllegalEntityException(message); - } else { - return new EventHubException(true, String.format(ClientConstants.AMQP_REQUEST_FAILED_ERROR, AmqpResponseCode.NOT_FOUND, message)); - } + if (m.find()) { + return new IllegalEntityException(message); + } else { + return new EventHubException(true, String.format(ClientConstants.AMQP_REQUEST_FAILED_ERROR, AmqpResponseCode.NOT_FOUND, message)); + } } static void completeExceptionally(CompletableFuture future, Exception exception, ErrorContextProvider contextProvider) { @@ -154,7 +162,7 @@ public static Throwable getExceptionFromCompletedFuture( return null; } - static Exception stripOuterException(final Exception exception) { + static Exception stripOuterException(final Exception exception) { Throwable throwable = exception.getCause(); if (throwable instanceof EventHubException) { return (EventHubException) throwable; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java index 3140af48cad5c..2b57c9c6c5557 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java @@ -46,8 +46,8 @@ public void runOnOpenedObject( @Override public void onEvent() { if (!creatingNewInnerObject - && (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED || - innerObject.getState() == IOObject.IOObjectState.CLOSING)) { + && (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED + || innerObject.getState() == IOObject.IOObjectState.CLOSING)) { creatingNewInnerObject = true; try { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 473e70004554a..41f84f3bc8ea3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -14,7 +14,11 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +29,11 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; -import java.util.*; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -647,19 +655,18 @@ public void run() { setClosed(); } } - } - , timeout.remaining()); + }, timeout.remaining()); this.openTimer.handleAsync( - (unUsed, exception) -> { - if (exception != null - && exception instanceof Exception - && !(exception instanceof CancellationException)) { - ExceptionUtil.completeExceptionally(linkOpen.getWork(), (Exception) exception, MessageReceiver.this); - } + (unUsed, exception) -> { + if (exception != null + && exception instanceof Exception + && !(exception instanceof CancellationException)) { + ExceptionUtil.completeExceptionally(linkOpen.getWork(), (Exception) exception, MessageReceiver.this); + } - return null; - }, this.executor); + return null; + }, this.executor); } private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) { @@ -687,17 +694,16 @@ public void run() { MessageReceiver.this.onError((Exception) null); } } - } - , timeout.remaining()); + }, timeout.remaining()); this.closeTimer.handleAsync( - (unUsed, exception) -> { - if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) { - ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageReceiver.this); - } + (unUsed, exception) -> { + if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) { + ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageReceiver.this); + } - return null; - }, this.executor); + return null; + }, this.executor); } private boolean shouldScheduleOperationTimeoutTimer() { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java index 5350ef22a31c9..b34473d6eae49 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java @@ -3,15 +3,30 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.ErrorContext; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.PayloadSizeExceededException; +import com.microsoft.azure.eventhubs.RetryPolicy; +import com.microsoft.azure.eventhubs.ServerBusyException; +import com.microsoft.azure.eventhubs.TimeoutException; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedLong; -import org.apache.qpid.proton.amqp.messaging.*; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.impl.DeliveryImpl; import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; @@ -24,7 +39,14 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; -import java.util.*; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -178,9 +200,9 @@ private CompletableFuture sendCore( final CompletableFuture onSendFuture = (onSend == null) ? new CompletableFuture<>() : onSend; - final ReplayableWorkItem sendWaiterData = (tracker == null) ? - new ReplayableWorkItem<>(bytes, arrayOffset, messageFormat, onSendFuture, this.operationTimeout) : - new ReplayableWorkItem<>(bytes, arrayOffset, messageFormat, onSendFuture, tracker); + final ReplayableWorkItem sendWaiterData = (tracker == null) + ? new ReplayableWorkItem<>(bytes, arrayOffset, messageFormat, onSendFuture, this.operationTimeout) + : new ReplayableWorkItem<>(bytes, arrayOffset, messageFormat, onSendFuture, tracker); final TimeoutTracker currentSendTracker = sendWaiterData.getTimeoutTracker(); final String deliveryTag = UUID.randomUUID().toString().replace("-", StringUtil.EMPTY) + "_" + currentSendTracker.elapsed().getSeconds(); @@ -199,15 +221,15 @@ private CompletableFuture sendCore( // if the timeoutTask completed with scheduling error - notify sender if (timeoutTimerTask.isCompletedExceptionally()) { timeoutTimerTask.handleAsync( - (unUsed, exception) -> { - if (exception != null && !(exception instanceof CancellationException)) - onSendFuture.completeExceptionally( - new OperationCancelledException(String.format(Locale.US, - "Entity(%s): send failed while dispatching to Reactor, see cause for more details.", - this.sendPath), exception)); + (unUsed, exception) -> { + if (exception != null && !(exception instanceof CancellationException)) + onSendFuture.completeExceptionally( + new OperationCancelledException(String.format(Locale.US, + "Entity(%s): send failed while dispatching to Reactor, see cause for more details.", + this.sendPath), exception)); - return null; - }, this.executor); + return null; + }, this.executor); return onSendFuture; } @@ -717,19 +739,18 @@ public void run() { setClosed(); } } - } - , timeout.remaining()); + }, timeout.remaining()); this.openTimer.handleAsync( - (unUsed, exception) -> { - if (exception != null - && exception instanceof Exception - && !(exception instanceof CancellationException)) { - ExceptionUtil.completeExceptionally(linkFirstOpen, (Exception) exception, this); - } + (unUsed, exception) -> { + if (exception != null + && exception instanceof Exception + && !(exception instanceof CancellationException)) { + ExceptionUtil.completeExceptionally(linkFirstOpen, (Exception) exception, this); + } - return null; - }, this.executor); + return null; + }, this.executor); } @Override @@ -912,17 +933,16 @@ public void run() { MessageSender.this.onError((Exception) null); } } - } - , timeout.remaining()); + }, timeout.remaining()); this.closeTimer.handleAsync( - (unUsed, exception) -> { - if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) { - ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageSender.this); - } + (unUsed, exception) -> { + if (exception != null && exception instanceof Exception && !(exception instanceof CancellationException)) { + ExceptionUtil.completeExceptionally(linkClose, (Exception) exception, MessageSender.this); + } - return null; - }, this.executor); + return null; + }, this.executor); } @Override diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index ded7245e6e4c5..a737b4bdbd61f 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -4,10 +4,21 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.CommunicationException; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.RetryPolicy; import com.microsoft.azure.eventhubs.TimeoutException; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.HandlerException; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.reactor.Reactor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +30,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -114,14 +129,14 @@ public void run() { // if scheduling messagingfactory openTimer fails - notify user and stop messagingFactory.openTimer.handleAsync( - (unUsed, exception) -> { - if (exception != null && !(exception instanceof CancellationException)) { - messagingFactory.open.completeExceptionally(exception); - messagingFactory.getReactor().stop(); - } + (unUsed, exception) -> { + if (exception != null && !(exception instanceof CancellationException)) { + messagingFactory.open.completeExceptionally(exception); + messagingFactory.getReactor().stop(); + } - return null; - }, messagingFactory.executor); + return null; + }, messagingFactory.executor); return messagingFactory.open; } @@ -324,7 +339,7 @@ private void onReactorError(Exception cause) { this.getClientId(), this.getHostName(), ExceptionUtil.toStackTraceString(e, "Re-starting reactor failed with error"))); - // TODO - stop retrying on the error after multiple attempts. + // TODO: stop retrying on the error after multiple attempts. this.onReactorError(cause); } @@ -355,15 +370,14 @@ protected CompletableFuture onClose() { if (!this.getIsClosed()) { final Timer timer = new Timer(this); this.closeTimer = timer.schedule(new Runnable() { - @Override - public void run() { - if (!closeTask.isDone()) { - closeTask.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); - getReactor().stop(); - } - } - }, - operationTimeout); + @Override + public void run() { + if (!closeTask.isDone()) { + closeTask.completeExceptionally(new TimeoutException("Closing MessagingFactory timed out.")); + getReactor().stop(); + } + } + }, operationTimeout); if (this.closeTimer.isCompletedExceptionally()) { this.closeTask.completeExceptionally(ExceptionUtil.getExceptionFromCompletedFuture(this.closeTimer)); @@ -468,8 +482,8 @@ public void onError(Exception error) { } private class RunReactor implements Runnable { - final private Reactor rctr; - final private ScheduledExecutorService executor; + private final Reactor rctr; + private final ScheduledExecutorService executor; volatile boolean hasStarted; @@ -529,11 +543,11 @@ public void run() { "Unhandled exception while processing events in reactor, report this error."))); } - final String message = !StringUtil.isNullOrEmpty(cause.getMessage()) ? - cause.getMessage() : - !StringUtil.isNullOrEmpty(handlerException.getMessage()) ? - handlerException.getMessage() : - "Reactor encountered unrecoverable error"; + final String message = !StringUtil.isNullOrEmpty(cause.getMessage()) + ? cause.getMessage() + : !StringUtil.isNullOrEmpty(handlerException.getMessage()) + ? handlerException.getMessage() + : "Reactor encountered unrecoverable error"; final EventHubException sbException; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java index 8e6d73029217d..51e0452de82be 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java @@ -3,7 +3,12 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventPosition; +import com.microsoft.azure.eventhubs.PartitionReceiveHandler; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.ReceiverOptions; +import com.microsoft.azure.eventhubs.ReceiverRuntimeInformation; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnknownDescribedType; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; @@ -12,7 +17,11 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; @@ -247,8 +256,8 @@ public Map getFilter(final Message lastReceivedMes @Override public Map getProperties() { - if (!this.isEpochReceiver && - (this.receiverOptions == null || this.receiverOptions.getIdentifier() == null)) { + if (!this.isEpochReceiver + && (this.receiverOptions == null || this.receiverOptions.getIdentifier() == null)) { return null; } @@ -272,4 +281,4 @@ public Symbol[] getDesiredCapabilities() { ? new Symbol[]{AmqpConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME} : null; } -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java index 20ec9cd874599..39e3a37c37ea6 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java @@ -3,7 +3,11 @@ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.*; +import com.microsoft.azure.eventhubs.BatchOptions; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventDataBatch; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.PartitionSender; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -54,8 +58,8 @@ public String getPartitionId() { public EventDataBatch createBatch(BatchOptions options) { if (!StringUtil.isNullOrEmpty(options.partitionKey)) { - throw new IllegalArgumentException("A partition key cannot be set when using PartitionSenderImpl. If you'd like to " + - "continue using PartitionSenderImpl with EventDataBatches, then please do not set a partition key in your BatchOptions."); + throw new IllegalArgumentException("A partition key cannot be set when using PartitionSenderImpl. If you'd like to " + + "continue using PartitionSenderImpl with EventDataBatches, then please do not set a partition key in your BatchOptions."); } int maxSize = this.internalSender.getMaxMessageSize(); @@ -65,8 +69,8 @@ public EventDataBatch createBatch(BatchOptions options) { } if (options.maxMessageSize > maxSize) { - throw new IllegalArgumentException("The maxMessageSize set in BatchOptions is too large. You set a maxMessageSize of " + - options.maxMessageSize + ". The maximum allowed size is " + maxSize + "."); + throw new IllegalArgumentException("The maxMessageSize set in BatchOptions is too large. You set a maxMessageSize of " + + options.maxMessageSize + ". The maximum allowed size is " + maxSize + "."); } return new EventDataBatchImpl(options.maxMessageSize, null); @@ -90,8 +94,8 @@ public final CompletableFuture send(EventDataBatch eventDatas) { } if (!StringUtil.isNullOrEmpty(((EventDataBatchImpl) eventDatas).getPartitionKey())) { - throw new IllegalArgumentException("A partition key cannot be set when using PartitionSenderImpl. If you'd like to " + - "continue using PartitionSenderImpl with EventDataBatches, then please do not set a partition key in your BatchOptions"); + throw new IllegalArgumentException("A partition key cannot be set when using PartitionSenderImpl. If you'd like to " + + "continue using PartitionSenderImpl with EventDataBatches, then please do not set a partition key in your BatchOptions"); } return this.internalSender.send(EventDataUtil.toAmqpMessages(((EventDataBatchImpl) eventDatas).getInternalIterable())); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java index ce40fec488bbd..e3d56bdd09e58 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java @@ -59,8 +59,8 @@ public void onLinkRemoteOpen(Event event) { } else { if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info( - String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], " + - "remoteSource[null], action[waitingForError]", receiver.getName())); + String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], " + + "remoteSource[null], action[waitingForError]", receiver.getName())); } } } @@ -90,8 +90,8 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn( receiveLink != null - ? String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + - "remoteCondition[%s], delivery.isSettled[%s]", + ? String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + + "remoteCondition[%s], delivery.isSettled[%s]", receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled()) : String.format(Locale.US, "delivery.isSettled[%s]", delivery.isSettled())); } @@ -102,8 +102,8 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isTraceEnabled() && receiveLink != null) { TRACE_LOGGER.trace( - String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + - "remoteCondition[%s], delivery.isPartial[%s]", + String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + + "remoteCondition[%s], delivery.isPartial[%s]", receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial())); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java index 7711b4a5a12c4..c3f60cb519a7e 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java @@ -55,8 +55,8 @@ public void run() { } catch (final Exception exception) { if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + - "encountered unrecoverable error and exited with exception %s.", + String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + + "encountered unrecoverable error and exited with exception %s.", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), exception.toString())); } @@ -111,8 +111,8 @@ private void handleUserCodeExceptions(final Throwable userCodeException) { this.isPumpHealthy = false; if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + - "exiting after user-code exception %s", + String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + + "exiting after user-code exception %s", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), userCodeException.toString())); } @@ -160,8 +160,8 @@ public Void apply(final Iterable receivedEvents, final Throwable clie try { // don't invoke user call back - if stop is already raised / pump is unhealthy - if (ReceivePump.this.shouldContinue() && - (receivedEvents != null + if (ReceivePump.this.shouldContinue() + && (receivedEvents != null || (receivedEvents == null && ReceivePump.this.invokeOnTimeout))) { ReceivePump.this.onReceiveHandler.onReceive(receivedEvents); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverContext.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverContext.java index db68225fc60a5..ee6b45052dcf8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverContext.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverContext.java @@ -8,8 +8,8 @@ import java.util.Locale; public class ReceiverContext extends ErrorContext { - final static boolean EPOCH_RECEIVER_TYPE = true; - final static boolean NON_EPOCH_RECEIVER_TYPE = !ReceiverContext.EPOCH_RECEIVER_TYPE; + static final boolean EPOCH_RECEIVER_TYPE = true; + static final boolean NON_EPOCH_RECEIVER_TYPE = !ReceiverContext.EPOCH_RECEIVER_TYPE; final String receivePath; final String referenceId; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index 2d7ceca5c2387..5300d7775ad34 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -11,7 +11,12 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import java.util.HashMap; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java index 7df9cfd2b0cc7..620363dab961b 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java @@ -69,9 +69,9 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isTraceEnabled()) { TRACE_LOGGER.trace( - "onDelivery linkName[" + sender.getName() + - "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() + - "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag()) + "]"); + "onDelivery linkName[" + sender.getName() + + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() + + "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag()) + "]"); } msgSender.onSendComplete(delivery); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java index 869feffdb23a9..f57455fcc4710 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java @@ -5,7 +5,11 @@ import com.microsoft.azure.eventhubs.EventHubException; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.reactor.Reactor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java index 5dcf60ba464e3..afc87d4ba1391 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java @@ -14,4 +14,4 @@ Session getSession( final String path, final Consumer onSessionOpen, final BiConsumer onSessionOpenError); -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java index 6168960476e70..0c0b1ac57af01 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java @@ -6,7 +6,7 @@ import java.util.UUID; public final class StringUtil { - public final static String EMPTY = ""; + public static final String EMPTY = ""; public static boolean isNullOrEmpty(String string) { return (string == null || string.isEmpty()); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java index 74e43b7deafb1..8a52e6bc28832 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java @@ -3,7 +3,9 @@ package com.microsoft.azure.eventhubs.impl; -import java.time.*; + +import java.time.Duration; +import java.time.Instant; public class TimeoutTracker { private final Duration originalTimeout; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java index b7365e651f870..00f011c7e675e 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java @@ -31,7 +31,7 @@ public CompletableFuture> schedule( return taskHandle; } - final static class ScheduledTask extends DispatchHandler { + static final class ScheduledTask extends DispatchHandler { final CompletableFuture> scheduledFuture; final Runnable runnable; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java index fd0c6f5a51692..74cde83b1ff42 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java @@ -29,10 +29,10 @@ static String parseRoleIdentifier(final String trackingId) { public static String getLinkName(final Session session) { // returned linkName lookslike: ea9cac_8b_G27_1479943074829 final String linkNamePrefix = StringUtil.getRandomString(); - final String linkNameWithServiceRoleTracker = session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) ? - linkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(session.getConnection().getRemoteContainer() - .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0), session.getConnection().getRemoteContainer().length())) : - linkNamePrefix; + final String linkNameWithServiceRoleTracker = session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) + ? linkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(session.getConnection().getRemoteContainer() + .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0), session.getConnection().getRemoteContainer().length())) + : linkNamePrefix; return linkNameWithServiceRoleTracker.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(String.valueOf(Instant.now().toEpochMilli())); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 63c4d55dd5588..5c97f42b71b2c 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -33,7 +33,7 @@ protected void addTransportLayers(final Event event, final TransportInternal tra transport.addTransportLayer(webSocket); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]"); + TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName + "]"); } super.addTransportLayers(event, transport); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index 0cfdd427ff080..9974ac05da869 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -63,7 +63,7 @@ protected void addTransportLayers(final Event event, final TransportInternal tra transport.addTransportLayer(proxy); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("addProxyHandshake: hostname[" + hostName +"]"); + TRACE_LOGGER.info("addProxyHandshake: hostname[" + hostName + "]"); } } @@ -95,7 +95,7 @@ protected void notifyTransportErrors(final Event event) { int port; try { port = Integer.parseInt(hostNameParts[1]); - } catch (NumberFormatException ignore){ + } catch (NumberFormatException ignore) { return; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WorkItem.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WorkItem.java index e65513703afe2..af00d91ae7e14 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WorkItem.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WorkItem.java @@ -3,8 +3,8 @@ package com.microsoft.azure.eventhubs.impl; -import java.time.*; -import java.util.concurrent.*; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; public class WorkItem { private final TimeoutTracker tracker; diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java index c4001c165975d..14543dd5a60cc 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java @@ -241,4 +241,4 @@ public void failed(Throwable exc, ReadWriteState attachment) { } } } -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyServer.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyServer.java index 40cbfbddf5543..e8795445fc59d 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyServer.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyServer.java @@ -15,4 +15,4 @@ static ProxyServer create(final String hostName, final int port) { void start(final Consumer onError) throws IOException; void stop() throws IOException; -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/SimpleProxy.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/SimpleProxy.java index 320130fcc53ea..396955d89ccea 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/SimpleProxy.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/SimpleProxy.java @@ -85,4 +85,4 @@ public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { } }); } -} \ No newline at end of file +} diff --git a/eventhubs/data-plane/pom.xml b/eventhubs/data-plane/pom.xml index 2f0276ce8eff8..f2d254e1811b0 100644 --- a/eventhubs/data-plane/pom.xml +++ b/eventhubs/data-plane/pom.xml @@ -32,11 +32,6 @@ scm:git:https://github.com/Azure/azure-sdk-for-java -- -0.31.0 -1.1.0 -org.apache.qpid