diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index c28de0c2f5339..5fa672fcad3d0 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -47,6 +47,14 @@ + + + + + + + diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java index a4938c1a88d62..19db35e7f2774 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java @@ -525,9 +525,7 @@ private boolean renewLeaseInternal(CompleteLease lease) throws StorageException leaseBlob.renewLease(AccessCondition.generateLeaseCondition(azLease.getToken()), this.renewRequestOptions, null); result = true; } catch (StorageException se) { - if (wasLeaseLost(se, azLease.getPartitionId())) { - // leave result as false - } else { + if (!wasLeaseLost(se, azLease.getPartitionId())) { throw se; } } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java index 5cd28a98b14a9..ddd94daf76431 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java @@ -27,7 +27,7 @@ */ public final class EventProcessorHost { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class); - private static final Object uuidSynchronizer = new Object(); + private static final Object UUID_SYNCHRONIZER = new Object(); // weOwnExecutor exists to support user-supplied thread pools. private final boolean weOwnExecutor; private final ScheduledExecutorService executorService; @@ -330,7 +330,7 @@ public static String createHostName(String prefix) { * @return A string UUID with dashes but no curly brackets. */ public static String safeCreateUUID() { - synchronized (EventProcessorHost.uuidSynchronizer) { + synchronized (EventProcessorHost.UUID_SYNCHRONIZER) { final UUID newUuid = UUID.randomUUID(); return newUuid.toString(); } @@ -530,7 +530,7 @@ public CompletableFuture unregisterEventProcessor() { } static class EventProcessorHostThreadPoolFactory implements ThreadFactory { - private static final AtomicInteger poolNumber = new AtomicInteger(1); + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); private final ThreadGroup group; private final String namePrefix; @@ -538,7 +538,7 @@ static class EventProcessorHostThreadPoolFactory implements ThreadFactory { private final String entityName; private final String consumerGroupName; - public EventProcessorHostThreadPoolFactory( + EventProcessorHostThreadPoolFactory( String hostName, String entityName, String consumerGroupName) { @@ -561,7 +561,7 @@ public Thread newThread(Runnable r) { private String getNamePrefix() { return String.format("[%s|%s|%s]-%s-", - this.entityName, this.consumerGroupName, this.hostName, poolNumber.getAndIncrement()); + this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement()); } static class ThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ICheckpointManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ICheckpointManager.java index 36a41f0641577..37026284bcd2a 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ICheckpointManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ICheckpointManager.java @@ -27,21 +27,21 @@ public interface ICheckpointManager { * * @return CompletableFuture {@literal ->} true if it exists, false if not */ - public CompletableFuture checkpointStoreExists(); + CompletableFuture checkpointStoreExists(); /*** * Create the checkpoint store if it doesn't exist. Do nothing if it does exist. * * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture createCheckpointStoreIfNotExists(); + CompletableFuture createCheckpointStoreIfNotExists(); /** * Deletes the checkpoint store. * * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture deleteCheckpointStore(); + CompletableFuture deleteCheckpointStore(); /*** * Get the checkpoint data associated with the given partition. Could return null if no checkpoint has @@ -51,7 +51,7 @@ public interface ICheckpointManager { * * @return CompletableFuture {@literal ->} checkpoint info, or null. Completes exceptionally on error. */ - public CompletableFuture getCheckpoint(String partitionId); + CompletableFuture getCheckpoint(String partitionId); /*** * Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs @@ -67,7 +67,7 @@ public interface ICheckpointManager { * @param partitionIds List of partitions to create checkpoint HOLDERs for. * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture createAllCheckpointsIfNotExists(List partitionIds); + CompletableFuture createAllCheckpointsIfNotExists(List partitionIds); /*** * Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. @@ -81,7 +81,7 @@ public interface ICheckpointManager { * @param checkpoint offset/sequenceNumber and partition id to update the store with. * @return CompletableFuture {@literal ->} null on success. Completes exceptionally on error. */ - public CompletableFuture updateCheckpoint(CompleteLease lease, Checkpoint checkpoint); + CompletableFuture updateCheckpoint(CompleteLease lease, Checkpoint checkpoint); /*** * Delete the stored checkpoint data for the given partition. If there is no stored checkpoint for the @@ -91,5 +91,5 @@ public interface ICheckpointManager { * @param partitionId id of partition to delete checkpoint from store * @return CompletableFuture {@literal ->} null on success. Completes exceptionally on error. */ - public CompletableFuture deleteCheckpoint(String partitionId); + CompletableFuture deleteCheckpoint(String partitionId); } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java index e8e09ac0d11bf..f06842a97b9b0 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java @@ -31,7 +31,7 @@ public interface IEventProcessor { * @param context Information about the partition that this event processor will process events from. * @throws Exception to indicate failure. */ - public void onOpen(PartitionContext context) throws Exception; + void onOpen(PartitionContext context) throws Exception; /** * Called by processor host to indicate that the event processor is being stopped. @@ -43,7 +43,7 @@ public interface IEventProcessor { * @param reason Reason why the event processor is being stopped. * @throws Exception to indicate failure. */ - public void onClose(PartitionContext context, CloseReason reason) throws Exception; + void onClose(PartitionContext context, CloseReason reason) throws Exception; /** * Called by the processor host when a batch of events has arrived. @@ -58,7 +58,7 @@ public interface IEventProcessor { * @param events The events to be processed. May be empty. * @throws Exception to indicate failure. */ - public void onEvents(PartitionContext context, Iterable events) throws Exception; + void onEvents(PartitionContext context, Iterable events) throws Exception; /** * Called when the underlying client experiences an error while receiving. EventProcessorHost will take @@ -68,5 +68,5 @@ public interface IEventProcessor { * @param context Information about the partition. * @param error The error that occured. */ - public void onError(PartitionContext context, Throwable error); + void onError(PartitionContext context, Throwable error); } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessorFactory.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessorFactory.java index f373d2457c0cb..ce745c0aaf03d 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessorFactory.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessorFactory.java @@ -22,5 +22,5 @@ public interface IEventProcessorFactory { * @throws Exception to indicate failure. * @return The event processor object. */ - public T createEventProcessor(PartitionContext context) throws Exception; + T createEventProcessor(PartitionContext context) throws Exception; } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ILeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ILeaseManager.java index fbbfe7c6bb5fe..d767d879d69f4 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ILeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/ILeaseManager.java @@ -25,7 +25,7 @@ public interface ILeaseManager { * * @return Duration of a lease before it expires unless renewed, specified in milliseconds. */ - public int getLeaseDurationInMilliseconds(); + int getLeaseDurationInMilliseconds(); /** * Does the lease store exist? @@ -35,21 +35,21 @@ public interface ILeaseManager { * * @return CompletableFuture {@literal ->} true if it exists, false if not */ - public CompletableFuture leaseStoreExists(); + CompletableFuture leaseStoreExists(); /** * Create the lease store if it does not exist, do nothing if it does exist. * * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture createLeaseStoreIfNotExists(); + CompletableFuture createLeaseStoreIfNotExists(); /** * Deletes the lease store. * * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture deleteLeaseStore(); + CompletableFuture deleteLeaseStore(); /** * Returns the lease info for the given partition.. @@ -57,7 +57,7 @@ public interface ILeaseManager { * @param partitionId Get the lease info for this partition. * @return CompletableFuture {@literal ->} Lease, completes exceptionally on error. */ - public CompletableFuture getLease(String partitionId); + CompletableFuture getLease(String partitionId); /** * Returns lightweight BaseLease for all leases, which includes name of owning host and whether lease @@ -67,7 +67,7 @@ public interface ILeaseManager { * * @return CompletableFuture {@literal ->} list of BaseLease, completes exceptionally on error. */ - public CompletableFuture> getAllLeases(); + CompletableFuture> getAllLeases(); /** @@ -77,7 +77,7 @@ public interface ILeaseManager { * @param partitionIds ids of partitions to create lease info for * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error */ - public CompletableFuture createAllLeasesIfNotExists(List partitionIds); + CompletableFuture createAllLeasesIfNotExists(List partitionIds); /** * Delete the lease info for a partition from the store. If there is no stored lease for the given partition, @@ -86,7 +86,7 @@ public interface ILeaseManager { * @param lease the currently existing lease info for the partition * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture deleteLease(CompleteLease lease); + CompletableFuture deleteLease(CompleteLease lease); /** * Acquire the lease on the desired partition for this EventProcessorHost. @@ -103,7 +103,7 @@ public interface ILeaseManager { * @param lease Lease info for the desired partition * @return CompletableFuture {@literal ->} true if the lease was acquired, false if not, completes exceptionally on error. */ - public CompletableFuture acquireLease(CompleteLease lease); + CompletableFuture acquireLease(CompleteLease lease); /** * Renew a lease currently held by this host instance. @@ -116,7 +116,7 @@ public interface ILeaseManager { * @param lease Lease to be renewed * @return true if the lease was renewed, false as described above, completes exceptionally on error. */ - public CompletableFuture renewLease(CompleteLease lease); + CompletableFuture renewLease(CompleteLease lease); /** * Give up a lease currently held by this host. @@ -127,7 +127,7 @@ public interface ILeaseManager { * @param lease Lease to be given up * @return CompletableFuture {@literal ->} null on success, completes exceptionally on error. */ - public CompletableFuture releaseLease(CompleteLease lease); + CompletableFuture releaseLease(CompleteLease lease); /** * Update the store with the information in the provided lease. @@ -139,5 +139,5 @@ public interface ILeaseManager { * @param lease New lease info to be stored * @return true if the update was successful, false if lease was lost and could not be updated, completes exceptionally on error. */ - public CompletableFuture updateLease(CompleteLease lease); + CompletableFuture updateLease(CompleteLease lease); } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryCheckpointManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryCheckpointManager.java index 594db9c70bb18..7b4c6c2dfa535 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryCheckpointManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryCheckpointManager.java @@ -45,7 +45,7 @@ public void initialize(HostContext hostContext) { @Override public CompletableFuture checkpointStoreExists() { - boolean exists = InMemoryCheckpointStore.singleton.existsMap(); + boolean exists = InMemoryCheckpointStore.SINGLETON.existsMap(); TRACE_LOGGER.debug(this.hostContext.withHost("checkpointStoreExists() " + exists)); return CompletableFuture.completedFuture(exists); } @@ -53,21 +53,21 @@ public CompletableFuture checkpointStoreExists() { @Override public CompletableFuture createCheckpointStoreIfNotExists() { TRACE_LOGGER.debug(this.hostContext.withHost("createCheckpointStoreIfNotExists()")); - InMemoryCheckpointStore.singleton.initializeMap(); + InMemoryCheckpointStore.SINGLETON.initializeMap(); return CompletableFuture.completedFuture(null); } @Override public CompletableFuture deleteCheckpointStore() { TRACE_LOGGER.debug(this.hostContext.withHost("deleteCheckpointStore()")); - InMemoryCheckpointStore.singleton.deleteMap(); + InMemoryCheckpointStore.SINGLETON.deleteMap(); return CompletableFuture.completedFuture(null); } @Override public CompletableFuture getCheckpoint(String partitionId) { Checkpoint returnCheckpoint = null; - Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(partitionId); + Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(partitionId); if (checkpointInStore == null) { TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(partitionId, "getCheckpoint() no existing Checkpoint")); @@ -87,7 +87,7 @@ public CompletableFuture getCheckpoint(String partitionId) { @Override public CompletableFuture createAllCheckpointsIfNotExists(List partitionIds) { for (String id : partitionIds) { - Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id); + Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(id); if (checkpointInStore != null) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id, "createCheckpointIfNotExists() found existing checkpoint, OK")); @@ -99,7 +99,7 @@ public CompletableFuture createAllCheckpointsIfNotExists(List part // and put it in the store, but the values are set to indicate that it is not initialized. newStoreCheckpoint.setOffset(null); newStoreCheckpoint.setSequenceNumber(-1); - InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint); + InMemoryCheckpointStore.SINGLETON.setOrReplaceCheckpoint(newStoreCheckpoint); } } return CompletableFuture.completedFuture(null); @@ -109,7 +109,7 @@ public CompletableFuture createAllCheckpointsIfNotExists(List part public CompletableFuture updateCheckpoint(CompleteLease lease, Checkpoint checkpoint) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(checkpoint.getPartitionId(), "updateCheckpoint() " + checkpoint.getOffset() + "//" + checkpoint.getSequenceNumber())); - Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(checkpoint.getPartitionId()); + Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(checkpoint.getPartitionId()); if (checkpointInStore != null) { checkpointInStore.setOffset(checkpoint.getOffset()); checkpointInStore.setSequenceNumber(checkpoint.getSequenceNumber()); @@ -123,13 +123,13 @@ public CompletableFuture updateCheckpoint(CompleteLease lease, Checkpoint @Override public CompletableFuture deleteCheckpoint(String partitionId) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "deleteCheckpoint()")); - InMemoryCheckpointStore.singleton.removeCheckpoint(partitionId); + InMemoryCheckpointStore.SINGLETON.removeCheckpoint(partitionId); return CompletableFuture.completedFuture(null); } private static class InMemoryCheckpointStore { - static final InMemoryCheckpointStore singleton = new InMemoryCheckpointStore(); + static final InMemoryCheckpointStore SINGLETON = new InMemoryCheckpointStore(); private ConcurrentHashMap inMemoryCheckpointsPrivate = null; diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java index ecb094bad5ecc..02e7ae8e1023d 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java @@ -70,7 +70,7 @@ public int getLeaseDurationInMilliseconds() { @Override public CompletableFuture leaseStoreExists() { - boolean exists = InMemoryLeaseStore.singleton.existsMap(); + boolean exists = InMemoryLeaseStore.SINGLETON.existsMap(); latency("leaseStoreExists"); TRACE_LOGGER.debug(this.hostContext.withHost("leaseStoreExists() " + exists)); return CompletableFuture.completedFuture(exists); @@ -79,7 +79,7 @@ public CompletableFuture leaseStoreExists() { @Override public CompletableFuture createLeaseStoreIfNotExists() { TRACE_LOGGER.debug(this.hostContext.withHost("createLeaseStoreIfNotExists()")); - InMemoryLeaseStore.singleton.initializeMap(getLeaseDurationInMilliseconds()); + InMemoryLeaseStore.SINGLETON.initializeMap(getLeaseDurationInMilliseconds()); latency("createLeaseStoreIfNotExists"); return CompletableFuture.completedFuture(null); } @@ -87,7 +87,7 @@ public CompletableFuture createLeaseStoreIfNotExists() { @Override public CompletableFuture deleteLeaseStore() { TRACE_LOGGER.debug(this.hostContext.withHost("deleteLeaseStore()")); - InMemoryLeaseStore.singleton.deleteMap(); + InMemoryLeaseStore.SINGLETON.deleteMap(); latency("deleteLeaseStore"); return CompletableFuture.completedFuture(null); } @@ -96,15 +96,15 @@ public CompletableFuture deleteLeaseStore() { public CompletableFuture getLease(String partitionId) { TRACE_LOGGER.debug(this.hostContext.withHost("getLease()")); latency("getLease"); - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(partitionId); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(partitionId); return CompletableFuture.completedFuture(new InMemoryLease(leaseInStore)); } @Override public CompletableFuture> getAllLeases() { ArrayList infos = new ArrayList(); - for (String id : InMemoryLeaseStore.singleton.getPartitionIds()) { - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(id); + for (String id : InMemoryLeaseStore.SINGLETON.getPartitionIds()) { + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(id); infos.add(new BaseLease(id, leaseInStore.getOwner(), !leaseInStore.isExpiredSync())); } latency("getAllLeasesStateInfo"); @@ -119,7 +119,7 @@ public CompletableFuture createAllLeasesIfNotExists(List partition for (String id : partitionIds) { final String workingId = id; CompletableFuture oneCreate = CompletableFuture.supplyAsync(() -> { - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(workingId); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(workingId); InMemoryLease returnLease = null; if (leaseInStore != null) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(workingId, @@ -129,7 +129,7 @@ public CompletableFuture createAllLeasesIfNotExists(List partition TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(workingId, "createLeaseIfNotExists() creating new lease")); InMemoryLease newStoreLease = new InMemoryLease(workingId); - InMemoryLeaseStore.singleton.setOrReplaceLease(newStoreLease); + InMemoryLeaseStore.SINGLETON.setOrReplaceLease(newStoreLease); returnLease = new InMemoryLease(newStoreLease); } latency("createLeaseIfNotExists " + workingId); @@ -145,7 +145,7 @@ public CompletableFuture createAllLeasesIfNotExists(List partition @Override public CompletableFuture deleteLease(CompleteLease lease) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "deleteLease()")); - InMemoryLeaseStore.singleton.removeLease((InMemoryLease) lease); + InMemoryLeaseStore.SINGLETON.removeLease((InMemoryLease) lease); latency("deleteLease " + lease.getPartitionId()); return CompletableFuture.completedFuture(null); } @@ -157,9 +157,9 @@ public CompletableFuture acquireLease(CompleteLease lease) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(leaseToAcquire, "acquireLease()")); boolean retval = true; - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(leaseToAcquire.getPartitionId()); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(leaseToAcquire.getPartitionId()); if (leaseInStore != null) { - InMemoryLease wasUnowned = InMemoryLeaseStore.singleton.atomicAquireUnowned(leaseToAcquire.getPartitionId(), this.hostContext.getHostName()); + InMemoryLease wasUnowned = InMemoryLeaseStore.SINGLETON.atomicAquireUnowned(leaseToAcquire.getPartitionId(), this.hostContext.getHostName()); if (wasUnowned != null) { // atomicAcquireUnowned already set ownership of the persisted lease, just update the live lease. leaseToAcquire.setOwner(this.hostContext.getHostName()); @@ -174,7 +174,7 @@ public CompletableFuture acquireLease(CompleteLease lease) { } else { String oldOwner = leaseInStore.getOwner(); // Make change in both persisted lease and live lease! - InMemoryLeaseStore.singleton.stealLease(leaseInStore, this.hostContext.getHostName()); + InMemoryLeaseStore.SINGLETON.stealLease(leaseInStore, this.hostContext.getHostName()); leaseToAcquire.setOwner(this.hostContext.getHostName()); TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(leaseToAcquire, "acquireLease() stole lease from " + oldOwner)); @@ -201,7 +201,7 @@ public CompletableFuture acquireLease(CompleteLease lease) { // own the lease for the given partition, then notifier is called immediately, otherwise it is called whenever // ownership of the lease changes. public void notifyOnSteal(String expectedOwner, String partitionId, Callable notifier) { - InMemoryLeaseStore.singleton.notifyOnSteal(expectedOwner, partitionId, notifier); + InMemoryLeaseStore.SINGLETON.notifyOnSteal(expectedOwner, partitionId, notifier); } @Override @@ -211,7 +211,7 @@ public CompletableFuture renewLease(CompleteLease lease) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(leaseToRenew, "renewLease()")); boolean retval = true; - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(leaseToRenew.getPartitionId()); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(leaseToRenew.getPartitionId()); if (leaseInStore != null) { // MATCH BEHAVIOR OF AzureStorageCheckpointLeaseManager: // Renewing a lease that has expired succeeds unless some other host has grabbed it already. @@ -244,7 +244,7 @@ public CompletableFuture releaseLease(CompleteLease lease) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(leaseToRelease, "releaseLease()")); - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(leaseToRelease.getPartitionId()); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(leaseToRelease.getPartitionId()); if (leaseInStore != null) { if (!leaseInStore.isExpiredSync() && leaseInStore.isOwnedBy(this.hostContext.getHostName())) { TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(leaseToRelease, "releaseLease() released OK")); @@ -253,8 +253,6 @@ public CompletableFuture releaseLease(CompleteLease lease) { leaseToRelease.setOwner(""); leaseInStore.setExpirationTime(0); leaseToRelease.setExpirationTime(0); - } else { - // Lease was lost, intent achieved. } } else { TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(leaseToRelease, "releaseLease() can't find lease in store")); @@ -274,7 +272,7 @@ public CompletableFuture updateLease(CompleteLease lease) { // Renew lease first so it doesn't expire in the middle. return renewLease(leaseToUpdate).thenApply((retval) -> { if (retval) { - InMemoryLease leaseInStore = InMemoryLeaseStore.singleton.getLease(leaseToUpdate.getPartitionId()); + InMemoryLease leaseInStore = InMemoryLeaseStore.SINGLETON.getLease(leaseToUpdate.getPartitionId()); if (leaseInStore != null) { if (!leaseInStore.isExpiredSync() && leaseInStore.isOwnedBy(this.hostContext.getHostName())) { // We are updating with values already in the live lease, so only need to set on the persisted lease. @@ -298,7 +296,7 @@ public CompletableFuture updateLease(CompleteLease lease) { private static class InMemoryLeaseStore { - static final InMemoryLeaseStore singleton = new InMemoryLeaseStore(); + static final InMemoryLeaseStore SINGLETON = new InMemoryLeaseStore(); private static int leaseDurationInMilliseconds; private ConcurrentHashMap inMemoryLeasesPrivate = null; @@ -402,12 +400,6 @@ void setExpirationTime(long expireAtMillis) { public boolean isExpiredSync() { boolean hasExpired = (System.currentTimeMillis() >= this.expirationTimeMillis); - if (hasExpired) { - // CHANGE TO MATCH BEHAVIOR OF AzureStorageCheckpointLeaseManager - // An expired lease can be renewed by the previous owner. In order to implement that behavior for - // InMemory, the owner field has to remain unchanged. - //setOwner(""); - } TRACE_LOGGER.debug("isExpired(" + this.getPartitionId() + (hasExpired ? ") expired " : ") leased ") + (this.expirationTimeMillis - System.currentTimeMillis())); return hasExpired; } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java index 3e0198b1c6446..f6bf75d4df842 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java @@ -20,7 +20,7 @@ class PartitionScanner extends Closable { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionScanner.class); - private static final Random randomizer = new Random(); + private static final Random RANDOMIZER = new Random(); private final HostContext hostContext; private final Consumer addPump; @@ -123,7 +123,7 @@ private int sortLeasesAndCalculateDesiredCount(boolean isFirst) { // If the entire system is starting up, the list of hosts is probably not complete and we can't really // compute a meaningful hostOrdinal. But we only want hostOrdinal to calculate startingPoint. Instead, // just randomly select a startingPoint. - startingPoint = PartitionScanner.randomizer.nextInt(this.allLeaseStates.size()); + startingPoint = PartitionScanner.RANDOMIZER.nextInt(this.allLeaseStates.size()); } else { for (hostOrdinal = 0; hostOrdinal < sortedHosts.size(); hostOrdinal++) { if (sortedHosts.get(hostOrdinal).compareTo(this.hostContext.getHostName()) == 0) { @@ -260,7 +260,7 @@ private ArrayList findLeasesToSteal(int stealAsk) { if (bigOwners.size() > 0) { // Randomly pick one of the big owners - String bigVictim = bigOwners.get(PartitionScanner.randomizer.nextInt(bigOwners.size())); + String bigVictim = bigOwners.get(PartitionScanner.RANDOMIZER.nextInt(bigOwners.size())); int victimExtra = hostOwns.get(bigVictim) - this.desiredCount - 1; int stealCount = Math.min(victimExtra, stealAsk); TRACE_LOGGER.debug(this.hostContext.withHost("Stealing " + stealCount + " from " + bigVictim)); diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PumpManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PumpManager.java index 46afe864b38a4..3d385f1f4497c 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PumpManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PumpManager.java @@ -16,7 +16,7 @@ class PumpManager extends Closable implements Consumer { protected final HostContext hostContext; protected ConcurrentHashMap pumpStates; // protected for testability - public PumpManager(HostContext hostContext, Closable parent) { + PumpManager(HostContext hostContext, Closable parent) { super(parent); this.hostContext = hostContext; diff --git a/eventhubs/data-plane/azure-eventhubs-extensions/src/main/java/com/microsoft/azure/eventhubs/extensions/appender/EventHubsAppender.java b/eventhubs/data-plane/azure-eventhubs-extensions/src/main/java/com/microsoft/azure/eventhubs/extensions/appender/EventHubsAppender.java index cf1f8dc6e5f89..988b52c203b3b 100644 --- a/eventhubs/data-plane/azure-eventhubs-extensions/src/main/java/com/microsoft/azure/eventhubs/extensions/appender/EventHubsAppender.java +++ b/eventhubs/data-plane/azure-eventhubs-extensions/src/main/java/com/microsoft/azure/eventhubs/extensions/appender/EventHubsAppender.java @@ -16,12 +16,13 @@ import org.apache.logging.log4j.core.util.StringEncoder; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Sends {@link LogEvent}'s to Microsoft Azure EventHubs. * By default, tuned for high performance and hence, pushes a batch of Events. @@ -76,7 +77,7 @@ public void append(LogEvent logEvent) { if (layout != null) { serializedLogEvent = layout.toByteArray(logEvent); } else { - serializedLogEvent = StringEncoder.toBytes(logEvent.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); + serializedLogEvent = StringEncoder.toBytes(logEvent.getMessage().getFormattedMessage(), UTF_8); } if (serializedLogEvent != null) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/BatchOptions.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/BatchOptions.java index f14ed27aba5d0..e7d0801290500 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/BatchOptions.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/BatchOptions.java @@ -56,7 +56,7 @@ public final class BatchOptions { */ public Integer maxMessageSize = null; - public final BatchOptions with(Consumer builderFunction) { + public BatchOptions with(Consumer builderFunction) { builderFunction.accept(this); return this; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ConnectionStringBuilder.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ConnectionStringBuilder.java index 763e79835f27b..380d2ac9cacdd 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ConnectionStringBuilder.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ConnectionStringBuilder.java @@ -47,27 +47,27 @@ */ public final class ConnectionStringBuilder { - static final String endpointFormat = "sb://%s.%s"; - static final String hostnameFormat = "sb://%s"; - static final String defaultDomainName = "servicebus.windows.net"; - - static final String HostnameConfigName = "Hostname"; // Hostname is a key that is used in IoTHub. - static final String EndpointConfigName = "Endpoint"; // Endpoint key is used in EventHubs. It's identical to Hostname in IoTHub. - static final String EntityPathConfigName = "EntityPath"; - static final String OperationTimeoutConfigName = "OperationTimeout"; - static final String KeyValueSeparator = "="; - static final String KeyValuePairDelimiter = ";"; - static final String SharedAccessKeyNameConfigName = "SharedAccessKeyName"; // We use a (KeyName, Key) pair OR the SAS token - never both. - static final String SharedAccessKeyConfigName = "SharedAccessKey"; - static final String SharedAccessSignatureConfigName = "SharedAccessSignature"; - static final String TransportTypeConfigName = "TransportType"; - - private static final String AllKeyEnumerateRegex = "(" + HostnameConfigName + "|" + EndpointConfigName + "|" + SharedAccessKeyNameConfigName - + "|" + SharedAccessKeyConfigName + "|" + SharedAccessSignatureConfigName + "|" + EntityPathConfigName + "|" + OperationTimeoutConfigName - + "|" + TransportTypeConfigName + ")"; - - private static final String KeysWithDelimitersRegex = KeyValuePairDelimiter + AllKeyEnumerateRegex - + KeyValueSeparator; + static final String END_POINT_FORMAT = "sb://%s.%s"; + static final String HOST_NAME_FORMAT = "sb://%s"; + static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net"; + + static final String HOST_NAME_CONFIG_NAME = "Hostname"; // Hostname is a key that is used in IoTHub. + static final String ENDPOINT_CONFIG_NAME = "Endpoint"; // Endpoint key is used in EventHubs. It's identical to Hostname in IoTHub. + static final String ENTITY_PATH_CONFIG_NAME = "EntityPath"; + static final String OPERATION_TIMEOUT_CONFIG_NAME = "OperationTimeout"; + static final String KEY_VALUE_SEPARATOR = "="; + static final String KEY_VALUE_PAIR_DELIMITER = ";"; + static final String SHARED_ACCESS_KEY_NANE_CONFIG_NAME = "SharedAccessKeyName"; // We use a (KeyName, Key) pair OR the SAS token - never both. + static final String SHARED_ACCESS_KEY_CONFIG_NAME = "SharedAccessKey"; + static final String SHARED_ACCESS_SIGNATURE_CONFIG_NAME = "SharedAccessSignature"; + static final String TRANSPORT_TYPE_CONFIG_NAME = "TransportType"; + + private static final String ALL_KEY_ENUMERATE_REGEX = "(" + HOST_NAME_CONFIG_NAME + "|" + ENDPOINT_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_NANE_CONFIG_NAME + + "|" + SHARED_ACCESS_KEY_CONFIG_NAME + "|" + SHARED_ACCESS_SIGNATURE_CONFIG_NAME + "|" + ENTITY_PATH_CONFIG_NAME + "|" + OPERATION_TIMEOUT_CONFIG_NAME + + "|" + TRANSPORT_TYPE_CONFIG_NAME + ")"; + + private static final String KEYS_WITH_DELIMITERS_REGEX = KEY_VALUE_PAIR_DELIMITER + ALL_KEY_ENUMERATE_REGEX + + KEY_VALUE_SEPARATOR; private URI endpoint; private String eventHubName; @@ -137,7 +137,7 @@ public ConnectionStringBuilder setEndpoint(URI endpoint) { */ public ConnectionStringBuilder setEndpoint(String namespaceName, String domainName) { try { - this.endpoint = new URI(String.format(Locale.US, endpointFormat, namespaceName, domainName)); + this.endpoint = new URI(String.format(Locale.US, END_POINT_FORMAT, namespaceName, domainName)); } catch (URISyntaxException exception) { throw new IllegalConnectionStringFormatException( String.format(Locale.US, "Invalid namespace name: %s", namespaceName), @@ -154,7 +154,7 @@ public ConnectionStringBuilder setEndpoint(String namespaceName, String domainNa * @return the {@link ConnectionStringBuilder} being set. */ public ConnectionStringBuilder setNamespaceName(String namespaceName) { - return this.setEndpoint(namespaceName, defaultDomainName); + return this.setEndpoint(namespaceName, DEFAULT_DOMAIN_NAME); } /** @@ -288,38 +288,38 @@ public ConnectionStringBuilder setTransportType(final TransportType transportTyp public String toString() { final StringBuilder connectionStringBuilder = new StringBuilder(); if (this.endpoint != null) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", EndpointConfigName, KeyValueSeparator, - this.endpoint.toString(), KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENDPOINT_CONFIG_NAME, KEY_VALUE_SEPARATOR, + this.endpoint.toString(), KEY_VALUE_PAIR_DELIMITER)); } if (!StringUtil.isNullOrWhiteSpace(this.eventHubName)) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", EntityPathConfigName, - KeyValueSeparator, this.eventHubName, KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENTITY_PATH_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.eventHubName, KEY_VALUE_PAIR_DELIMITER)); } if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKeyName)) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SharedAccessKeyNameConfigName, - KeyValueSeparator, this.sharedAccessKeyName, KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SHARED_ACCESS_KEY_NANE_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.sharedAccessKeyName, KEY_VALUE_PAIR_DELIMITER)); } if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKey)) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SharedAccessKeyConfigName, - KeyValueSeparator, this.sharedAccessKey, KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SHARED_ACCESS_KEY_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.sharedAccessKey, KEY_VALUE_PAIR_DELIMITER)); } if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessSignature)) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SharedAccessSignatureConfigName, - KeyValueSeparator, this.sharedAccessSignature, KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SHARED_ACCESS_SIGNATURE_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.sharedAccessSignature, KEY_VALUE_PAIR_DELIMITER)); } if (this.operationTimeout != null) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", OperationTimeoutConfigName, - KeyValueSeparator, this.operationTimeout.toString(), KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", OPERATION_TIMEOUT_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.operationTimeout.toString(), KEY_VALUE_PAIR_DELIMITER)); } if (this.transportType != null) { - connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", TransportTypeConfigName, - KeyValueSeparator, this.transportType.toString(), KeyValuePairDelimiter)); + connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", TRANSPORT_TYPE_CONFIG_NAME, + KEY_VALUE_SEPARATOR, this.transportType.toString(), KEY_VALUE_PAIR_DELIMITER)); } connectionStringBuilder.deleteCharAt(connectionStringBuilder.length() - 1); @@ -332,9 +332,9 @@ private void parseConnectionString(final String connectionString) { throw new IllegalConnectionStringFormatException("connectionString cannot be empty"); } - final String connection = KeyValuePairDelimiter + connectionString; + final String connection = KEY_VALUE_PAIR_DELIMITER + connectionString; - final Pattern keyValuePattern = Pattern.compile(KeysWithDelimitersRegex, Pattern.CASE_INSENSITIVE); + final Pattern keyValuePattern = Pattern.compile(KEYS_WITH_DELIMITERS_REGEX, Pattern.CASE_INSENSITIVE); final String[] values = keyValuePattern.split(connection); final Matcher keys = keyValuePattern.matcher(connection); @@ -359,54 +359,54 @@ private void parseConnectionString(final String connectionString) { String.format(Locale.US, "Value for the connection string parameter name: %s, not found", key)); } - if (key.equalsIgnoreCase(EndpointConfigName)) { + if (key.equalsIgnoreCase(ENDPOINT_CONFIG_NAME)) { if (this.endpoint != null) { // we have parsed the endpoint once, which means we have multiple config which is not allowed throw new IllegalConnectionStringFormatException( - String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", EndpointConfigName, HostnameConfigName)); + String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", ENDPOINT_CONFIG_NAME, HOST_NAME_CONFIG_NAME)); } try { this.endpoint = new URI(values[valueIndex]); } catch (URISyntaxException exception) { throw new IllegalConnectionStringFormatException( - String.format(Locale.US, "%s should be in format scheme://fullyQualifiedServiceBusNamespaceEndpointName", EndpointConfigName), + String.format(Locale.US, "%s should be in format scheme://fullyQualifiedServiceBusNamespaceEndpointName", ENDPOINT_CONFIG_NAME), exception); } - } else if (key.equalsIgnoreCase(HostnameConfigName)) { + } else if (key.equalsIgnoreCase(HOST_NAME_CONFIG_NAME)) { if (this.endpoint != null) { // we have parsed the endpoint once, which means we have multiple config which is not allowed throw new IllegalConnectionStringFormatException( - String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", EndpointConfigName, HostnameConfigName)); + String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", ENDPOINT_CONFIG_NAME, HOST_NAME_CONFIG_NAME)); } try { - this.endpoint = new URI(String.format(Locale.US, hostnameFormat, values[valueIndex])); + this.endpoint = new URI(String.format(Locale.US, HOST_NAME_FORMAT, values[valueIndex])); } catch (URISyntaxException exception) { throw new IllegalConnectionStringFormatException( - String.format(Locale.US, "%s should be a fully quantified host name address", HostnameConfigName), + String.format(Locale.US, "%s should be a fully quantified host name address", HOST_NAME_CONFIG_NAME), exception); } - } else if (key.equalsIgnoreCase(SharedAccessKeyNameConfigName)) { + } else if (key.equalsIgnoreCase(SHARED_ACCESS_KEY_NANE_CONFIG_NAME)) { this.sharedAccessKeyName = values[valueIndex]; - } else if (key.equalsIgnoreCase(SharedAccessKeyConfigName)) { + } else if (key.equalsIgnoreCase(SHARED_ACCESS_KEY_CONFIG_NAME)) { this.sharedAccessKey = values[valueIndex]; - } else if (key.equalsIgnoreCase(SharedAccessSignatureConfigName)) { + } else if (key.equalsIgnoreCase(SHARED_ACCESS_SIGNATURE_CONFIG_NAME)) { this.sharedAccessSignature = values[valueIndex]; - } else if (key.equalsIgnoreCase(EntityPathConfigName)) { + } else if (key.equalsIgnoreCase(ENTITY_PATH_CONFIG_NAME)) { this.eventHubName = values[valueIndex]; - } else if (key.equalsIgnoreCase(OperationTimeoutConfigName)) { + } else if (key.equalsIgnoreCase(OPERATION_TIMEOUT_CONFIG_NAME)) { try { this.operationTimeout = Duration.parse(values[valueIndex]); } catch (DateTimeParseException exception) { throw new IllegalConnectionStringFormatException("Invalid value specified for property 'Duration' in the ConnectionString.", exception); } - } else if (key.equalsIgnoreCase(TransportTypeConfigName)) { + } else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME)) { try { this.transportType = TransportType.fromString(values[valueIndex]); } catch (IllegalArgumentException exception) { throw new IllegalConnectionStringFormatException( - String.format("Invalid value specified for property '%s' in the ConnectionString.", TransportTypeConfigName), + String.format("Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME), exception); } } else { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataBatch.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataBatch.java index b9568894b872a..f662cc68a8d7d 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataBatch.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataBatch.java @@ -23,5 +23,5 @@ public interface EventDataBatch { * @return A boolean value indicating if the {@link EventData} addition to this batch/collection was successful or not. * @throws PayloadSizeExceededException when a single {@link EventData} instance exceeds maximum allowed size of the batch */ - boolean tryAdd(final EventData eventData) throws PayloadSizeExceededException; + boolean tryAdd(EventData eventData) throws PayloadSizeExceededException; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 0e943f0bcbb9a..ea49239ed07e7 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -146,7 +146,7 @@ default void sendSync(final EventData data) throws EventHubException { * @see #send(EventData, String) * @see PartitionSender#send(EventData) */ - CompletableFuture send(final EventData data); + CompletableFuture send(EventData data); /** * Synchronous version of {@link #send(Iterable)}. @@ -199,7 +199,7 @@ default void sendSync(final Iterable eventDatas) throws EventHubExcep * @see #send(EventData, String) * @see PartitionSender#send(EventData) */ - CompletableFuture send(final Iterable eventDatas); + CompletableFuture send(Iterable eventDatas); /** * Synchronous version of {@link #send(EventDataBatch)}. @@ -221,7 +221,7 @@ default void sendSync(final EventDataBatch eventDatas) throws EventHubException * @see #send(Iterable) * @see EventDataBatch */ - CompletableFuture send(final EventDataBatch eventDatas); + CompletableFuture send(EventDataBatch eventDatas); /** * Synchronous version of {@link #send(EventData, String)}. @@ -261,7 +261,7 @@ default void sendSync(final EventData eventData, final String partitionKey) thro * @see #send(EventData) * @see PartitionSender#send(EventData) */ - CompletableFuture send(final EventData eventData, final String partitionKey); + CompletableFuture send(EventData eventData, String partitionKey); /** * Synchronous version of {@link #send(Iterable, String)}. @@ -292,7 +292,7 @@ default void sendSync(final Iterable eventDatas, final String partiti * @see #send(EventData) * @see PartitionSender#send(EventData) */ - CompletableFuture send(final Iterable eventDatas, final String partitionKey); + CompletableFuture send(Iterable eventDatas, String partitionKey); /** * Synchronous version of {@link #createPartitionSender(String)}. @@ -320,7 +320,7 @@ default PartitionSender createPartitionSenderSync(final String partitionId) thro * @throws EventHubException if Service Bus service encountered problems during connection creation. * @see PartitionSender */ - CompletableFuture createPartitionSender(final String partitionId) throws EventHubException; + CompletableFuture createPartitionSender(String partitionId) throws EventHubException; /** * Synchronous version of {@link #createReceiver(String, String, EventPosition)}. @@ -346,7 +346,7 @@ default PartitionReceiver createReceiverSync(final String consumerGroupName, fin * @throws EventHubException if Service Bus service encountered problems during the operation. * @see PartitionReceiver */ - CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition) throws EventHubException; + CompletableFuture createReceiver(String consumerGroupName, String partitionId, EventPosition eventPosition) throws EventHubException; /** * Synchronous version of {@link #createReceiver(String, String, EventPosition)}. @@ -374,7 +374,7 @@ default PartitionReceiver createReceiverSync(final String consumerGroupName, fin * @throws EventHubException if Service Bus service encountered problems during the operation. * @see PartitionReceiver */ - CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final ReceiverOptions receiverOptions) throws EventHubException; + CompletableFuture createReceiver(String consumerGroupName, String partitionId, EventPosition eventPosition, ReceiverOptions receiverOptions) throws EventHubException; /** * Synchronous version of {@link #createEpochReceiver(String, String, EventPosition, long)}. @@ -410,7 +410,7 @@ default PartitionReceiver createEpochReceiverSync(final String consumerGroupName * @see PartitionReceiver * @see ReceiverDisconnectedException */ - CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch) throws EventHubException; + CompletableFuture createEpochReceiver(String consumerGroupName, String partitionId, EventPosition eventPosition, long epoch) throws EventHubException; /** * Synchronous version of {@link #createEpochReceiver(String, String, EventPosition, long)}. @@ -448,7 +448,7 @@ default PartitionReceiver createEpochReceiverSync(final String consumerGroupName * @see PartitionReceiver * @see ReceiverDisconnectedException */ - CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch, final ReceiverOptions receiverOptions) throws EventHubException; + CompletableFuture createEpochReceiver(String consumerGroupName, String partitionId, EventPosition eventPosition, long epoch, ReceiverOptions receiverOptions) throws EventHubException; /** * Retrieves general information about an event hub (see {@link EventHubRuntimeInformation} for details). diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java index e51c203fa868b..ab09ed59cbfeb 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiveHandler.java @@ -25,12 +25,12 @@ public interface PartitionReceiveHandler { * @param events the list of fetched events from the corresponding PartitionReceiver. * @see PartitionReceiver#receive */ - void onReceive(final Iterable events); + void onReceive(Iterable events); /** * Implement this method to Listen to errors which lead to Closure of the {@link PartitionReceiveHandler} pump. * * @param error fatal error encountered while running the {@link PartitionReceiveHandler} pump */ - void onError(final Throwable error); + void onError(Throwable error); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index e724e49f17d9e..b48a67f11fc45 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -114,7 +114,7 @@ default Iterable receiveSync(final int maxEventCount) throws EventHub * @param maxEventCount maximum number of {@link EventData}'s that this call should return * @return A completableFuture that will yield a batch of {@link EventData}'s from the partition on which this receiver is created. Returns 'null' if no {@link EventData} is present. */ - CompletableFuture> receive(final int maxEventCount); + CompletableFuture> receive(int maxEventCount); /** * Register a receive handler that will be called when an event is available. A @@ -124,7 +124,7 @@ default Iterable receiveSync(final int maxEventCount) throws EventHub * @param receiveHandler An implementation of {@link PartitionReceiveHandler}. Setting this handler to null will stop the receive pump. * @return A completableFuture which sets receiveHandler */ - CompletableFuture setReceiveHandler(final PartitionReceiveHandler receiveHandler); + CompletableFuture setReceiveHandler(PartitionReceiveHandler receiveHandler); /** * Register a receive handler that will be called when an event is available. A @@ -135,7 +135,7 @@ default Iterable receiveSync(final int maxEventCount) throws EventHub * @param invokeWhenNoEvents flag to indicate whether the {@link PartitionReceiveHandler#onReceive(Iterable)} should be invoked when the receive call times out * @return A completableFuture which sets receiveHandler */ - CompletableFuture setReceiveHandler(final PartitionReceiveHandler receiveHandler, final boolean invokeWhenNoEvents); + CompletableFuture setReceiveHandler(PartitionReceiveHandler receiveHandler, boolean invokeWhenNoEvents); CompletableFuture close(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java index f74b858c430f3..a81285f71aeb4 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java @@ -21,7 +21,7 @@ final class ActiveClientTokenManager { private final Timer timerScheduler; private CompletableFuture timer; - public ActiveClientTokenManager( + ActiveClientTokenManager( final ClientEntity clientEntity, final Runnable sendTokenAsync, final Duration tokenRefreshInterval, diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpResponseCode.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpResponseCode.java index 5af855becdf52..6b7a070630ac0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpResponseCode.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpResponseCode.java @@ -25,7 +25,7 @@ public enum AmqpResponseCode { private final int value; - private AmqpResponseCode(final int value) { + AmqpResponseCode(final int value) { this.value = value; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpSender.java index 0e706b5344130..aed1ad11e7b34 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpSender.java @@ -6,7 +6,7 @@ import org.apache.qpid.proton.engine.Delivery; public interface AmqpSender extends AmqpLink { - void onFlow(final int creditIssued); + void onFlow(int creditIssued); - void onSendComplete(final Delivery delivery); + void onSendComplete(Delivery delivery); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java index 4918e17ca39b6..6a85fcd82331a 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java @@ -17,7 +17,7 @@ final class CBSChannel { final SessionProvider sessionProvider; final AmqpConnection connectionEventDispatcher; - public CBSChannel( + CBSChannel( final SessionProvider sessionProvider, final AmqpConnection connection) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java index f68b7f6b9064e..f4266722b3bac 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java @@ -29,11 +29,11 @@ final class EventDataBatchImpl implements EventDataBatch { this.eventBytes = new byte[maxMessageSize]; } - public final int getSize() { + public int getSize() { return events.size(); } - public final boolean tryAdd(final EventData eventData) throws PayloadSizeExceededException { + public boolean tryAdd(final EventData eventData) throws PayloadSizeExceededException { if (eventData == null) { throw new IllegalArgumentException("eventData cannot be null"); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index d8bcf45a4065c..0af606a9eb3cb 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -77,7 +77,7 @@ public String getEventHubName() { return eventHubName; } - public final EventDataBatch createBatch(BatchOptions options) throws EventHubException { + public EventDataBatch createBatch(BatchOptions options) throws EventHubException { return ExceptionUtil.sync(() -> { int maxSize = this.createInternalSender().thenApplyAsync( @@ -98,7 +98,7 @@ public final EventDataBatch createBatch(BatchOptions options) throws EventHubExc } @Override - public final CompletableFuture send(final EventData data) { + public CompletableFuture send(final EventData data) { if (data == null) { throw new IllegalArgumentException("EventData cannot be empty."); } @@ -112,7 +112,7 @@ public CompletableFuture apply(Void voidArg) { } @Override - public final CompletableFuture send(final Iterable eventDatas) { + public CompletableFuture send(final Iterable eventDatas) { if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas, 0)) { throw new IllegalArgumentException("Empty batch of EventData cannot be sent."); } @@ -126,7 +126,7 @@ public CompletableFuture apply(Void voidArg) { } @Override - public final CompletableFuture send(final EventDataBatch eventDatas) { + public CompletableFuture send(final EventDataBatch eventDatas) { if (eventDatas == null || Integer.compare(eventDatas.getSize(), 0) == 0) { throw new IllegalArgumentException("Empty batch of EventData cannot be sent."); } @@ -138,7 +138,7 @@ public final CompletableFuture send(final EventDataBatch eventDatas) { } @Override - public final CompletableFuture send(final EventData eventData, final String partitionKey) { + public CompletableFuture send(final EventData eventData, final String partitionKey) { if (eventData == null) { throw new IllegalArgumentException("EventData cannot be null."); } @@ -156,7 +156,7 @@ public CompletableFuture apply(Void voidArg) { } @Override - public final CompletableFuture send(final Iterable eventDatas, final String partitionKey) { + public CompletableFuture send(final Iterable eventDatas, final String partitionKey) { if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas, 0)) { throw new IllegalArgumentException("Empty batch of EventData cannot be sent."); } @@ -179,31 +179,31 @@ public CompletableFuture apply(Void voidArg) { } @Override - public final CompletableFuture createPartitionSender(final String partitionId) + public CompletableFuture createPartitionSender(final String partitionId) throws EventHubException { - return PartitionSenderImpl.Create(this.underlyingFactory, this.eventHubName, partitionId, this.executor); + return PartitionSenderImpl.create(this.underlyingFactory, this.eventHubName, partitionId, this.executor); } @Override - public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition) + public CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition) throws EventHubException { return this.createReceiver(consumerGroupName, partitionId, eventPosition, null); } @Override - public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final ReceiverOptions receiverOptions) + public CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final ReceiverOptions receiverOptions) throws EventHubException { return PartitionReceiverImpl.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, eventPosition, PartitionReceiverImpl.NULL_EPOCH, false, receiverOptions, this.executor); } @Override - public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch) + public CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch) throws EventHubException { return this.createEpochReceiver(consumerGroupName, partitionId, eventPosition, epoch, null); } @Override - public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch, final ReceiverOptions receiverOptions) + public CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final EventPosition eventPosition, final long epoch, final ReceiverOptions receiverOptions) throws EventHubException { return PartitionReceiverImpl.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, eventPosition, epoch, true, receiverOptions, this.executor); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java index d398d03a8f659..9126fab989533 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/IOObject.java @@ -6,9 +6,9 @@ public interface IOObject { // should be run on reactor thread - public IOObjectState getState(); + IOObjectState getState(); - public static enum IOObjectState { + enum IOObjectState { OPENING, OPENED, CLOSED, diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java index 801ee5cb69292..86d4454912356 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java @@ -21,7 +21,7 @@ final class ManagementChannel { final SessionProvider sessionProvider; final AmqpConnection connectionEventDispatcher; - public ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection) { + ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection) { this.sessionProvider = sessionProvider; this.connectionEventDispatcher = connection; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 41f84f3bc8ea3..f05d749fa1616 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -790,7 +790,7 @@ protected Exception getLastKnownError() { private static class ReceiveWorkItem extends WorkItem> { private final int maxMessageCount; - public ReceiveWorkItem(CompletableFuture> completableFuture, Duration timeout, final int maxMessageCount) { + ReceiveWorkItem(CompletableFuture> completableFuture, Duration timeout, final int maxMessageCount) { super(completableFuture, timeout); this.maxMessageCount = maxMessageCount; } @@ -801,12 +801,13 @@ private final class ReceiveWork extends DispatchHandler { @Override public void onEvent() { - ReceiveWorkItem pendingReceive; - while (!prefetchedMessages.isEmpty() && (pendingReceive = pendingReceives.poll()) != null) { + ReceiveWorkItem pendingReceive = pendingReceives.poll(); + while (!prefetchedMessages.isEmpty() && pendingReceive != null) { if (pendingReceive.getWork() != null && !pendingReceive.getWork().isDone()) { Collection receivedMessages = receiveCore(pendingReceive.maxMessageCount); pendingReceive.getWork().complete(receivedMessages); } + pendingReceive = pendingReceives.poll(); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java index b34473d6eae49..440969a26ab00 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java @@ -54,6 +54,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Abstracts all amqp related details * translates event-driven reactor model into async send Api @@ -520,7 +522,7 @@ public void onEvent() { @Override public void onSendComplete(final Delivery delivery) { final DeliveryState outcome = delivery.getRemoteState(); - final String deliveryTag = new String(delivery.getTag()); + final String deliveryTag = new String(delivery.getTag(), UTF_8); if (TRACE_LOGGER.isTraceEnabled()) TRACE_LOGGER.trace( @@ -836,7 +838,7 @@ private void processSendWork() { Exception sendException = null; try { - delivery = this.sendLink.delivery(deliveryTag.getBytes()); + delivery = this.sendLink.delivery(deliveryTag.getBytes(UTF_8)); delivery.setMessageFormat(sendData.getMessageFormat()); sentMsgSize = this.sendLink.send(sendData.getMessage(), 0, sendData.getEncodedMessageSize()); @@ -1010,7 +1012,7 @@ private class SendTimeout implements Runnable { private final String deliveryTag; private final ReplayableWorkItem sendWaiterData; - public SendTimeout( + SendTimeout( final String deliveryTag, final ReplayableWorkItem sendWaiterData) { this.sendWaiterData = sendWaiterData; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index a737b4bdbd61f..1561d1d0ee608 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -487,7 +487,7 @@ private class RunReactor implements Runnable { volatile boolean hasStarted; - public RunReactor(final Reactor reactor, final ScheduledExecutorService executor) { + RunReactor(final Reactor reactor, final ScheduledExecutorService executor) { this.rctr = reactor; this.executor = executor; this.hasStarted = false; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java index 51e0452de82be..1346acb2a93e0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java @@ -113,15 +113,15 @@ public void accept(MessageReceiver r) { }, this.executor); } - final EventPosition getStartingPosition() { + EventPosition getStartingPosition() { return this.eventPosition; } - public final String getPartitionId() { + public String getPartitionId() { return this.partitionId; } - public final Duration getReceiveTimeout() { + public Duration getReceiveTimeout() { return this.internalReceiver.getReceiveTimeout(); } @@ -129,15 +129,15 @@ public void setReceiveTimeout(Duration value) { this.internalReceiver.setReceiveTimeout(value); } - public final long getEpoch() { + public long getEpoch() { return this.epoch; } - public final ReceiverRuntimeInformation getRuntimeInformation() { + public ReceiverRuntimeInformation getRuntimeInformation() { return this.runtimeInformation; } - public final EventPosition getEventPosition() { + public EventPosition getEventPosition() { return this.currentEventPosition; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java index 39e3a37c37ea6..bbde282f1f4e5 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java @@ -29,7 +29,7 @@ private PartitionSenderImpl(final MessagingFactory factory, final String eventHu this.factory = factory; } - static CompletableFuture Create(final MessagingFactory factory, + static CompletableFuture create(final MessagingFactory factory, final String eventHubName, final String partitionId, final ScheduledExecutorService executor) throws EventHubException { @@ -76,11 +76,11 @@ public EventDataBatch createBatch(BatchOptions options) { return new EventDataBatchImpl(options.maxMessageSize, null); } - public final CompletableFuture send(EventData data) { + public CompletableFuture send(EventData data) { return this.internalSender.send(((EventDataImpl) data).toAmqpMessage()); } - public final CompletableFuture send(Iterable eventDatas) { + public CompletableFuture send(Iterable eventDatas) { if (eventDatas == null || IteratorUtil.sizeEquals(eventDatas, 0)) { throw new IllegalArgumentException("EventData batch cannot be empty."); } @@ -88,7 +88,7 @@ public final CompletableFuture send(Iterable eventDatas) { return this.internalSender.send(EventDataUtil.toAmqpMessages(eventDatas)); } - public final CompletableFuture send(EventDataBatch eventDatas) { + public CompletableFuture send(EventDataBatch eventDatas) { if (eventDatas == null || Integer.compare(eventDatas.getSize(), 0) == 0) { throw new IllegalArgumentException("EventDataBatch cannot be empty."); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorDispatcher.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorDispatcher.java index 9da73591914bc..7e0b55b54afd1 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorDispatcher.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorDispatcher.java @@ -86,7 +86,9 @@ private void throwIfSchedulerError() { private void signalWorkQueue() throws IOException { try { - while (this.ioSignal.sink().write(ByteBuffer.allocate(1)) == 0) { + ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); + while (this.ioSignal.sink().write(oneByteBuffer) == 0) { + oneByteBuffer = ByteBuffer.allocate(1); } } catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) { TRACE_LOGGER.info("signalWorkQueue failed with an error", ignorePipeClosedDuringReactorShutdown); @@ -98,7 +100,7 @@ private final class DelayHandler extends BaseHandler { final BaseHandler timerCallback; final Reactor reactor; - public DelayHandler(final Reactor reactor, final int delay, final DispatchHandler timerCallback) { + DelayHandler(final Reactor reactor, final int delay, final DispatchHandler timerCallback) { this.delay = delay; this.timerCallback = timerCallback; this.reactor = reactor; @@ -114,8 +116,10 @@ private final class ScheduleHandler implements Callback { @Override public void run(Selectable selectable) { try { - while (ioSignal.source().read(ByteBuffer.allocate(1024)) > 0) { + ByteBuffer oneKbByteBuffer = ByteBuffer.allocate(1024); + while (ioSignal.source().read(oneKbByteBuffer) > 0) { // read until the end of the stream + oneKbByteBuffer = ByteBuffer.allocate(1024); } } catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) { TRACE_LOGGER.info("ScheduleHandler.run() failed with an error", ignorePipeClosedDuringReactorShutdown); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java index c3f60cb519a7e..d1173d1014a83 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java @@ -148,7 +148,7 @@ private void schedulePump() { public interface IPartitionReceiver { String getPartitionId(); - CompletableFuture> receive(final int maxBatchSize); + CompletableFuture> receive(int maxBatchSize); } private final class ProcessAndReschedule implements BiFunction, Throwable, Void> { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverSettingsProvider.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverSettingsProvider.java index 14867deb65497..d03cc74a16b74 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverSettingsProvider.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiverSettingsProvider.java @@ -10,7 +10,7 @@ import java.util.Map; public interface ReceiverSettingsProvider { - Map getFilter(final Message lastReceivedMessage); + Map getFilter(Message lastReceivedMessage); Map getProperties(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index 5300d7775ad34..879968e42015a 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static java.nio.charset.StandardCharsets.UTF_8; + public class RequestResponseChannel implements IOObject { private final Sender sendLink; @@ -115,7 +117,7 @@ public void request( this.inflightRequests.put(message.getMessageId(), onResponse); - sendLink.delivery(UUID.randomUUID().toString().replace("-", StringUtil.EMPTY).getBytes()); + sendLink.delivery(UUID.randomUUID().toString().replace("-", StringUtil.EMPTY).getBytes(UTF_8)); final int payloadSize = AmqpUtil.getDataSerializedSize(message) + 512; // need buffer for headers final byte[] bytes = new byte[payloadSize]; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java index 620363dab961b..981583de50362 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java @@ -12,6 +12,8 @@ import java.util.Locale; +import static java.nio.charset.StandardCharsets.UTF_8; + public class SendLinkHandler extends BaseLinkHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SendLinkHandler.class); private final AmqpSender msgSender; @@ -71,7 +73,7 @@ public void onDelivery(Event event) { TRACE_LOGGER.trace( "onDelivery linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() - + "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag()) + "]"); + + "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag(), UTF_8) + "]"); } msgSender.onSendComplete(delivery); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java index afc87d4ba1391..e97110ba0e1ff 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionProvider.java @@ -11,7 +11,7 @@ public interface SessionProvider { Session getSession( - final String path, - final Consumer onSessionOpen, - final BiConsumer onSessionOpenError); + String path, + Consumer onSessionOpen, + BiConsumer onSessionOpenError); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SharedAccessSignatureTokenProvider.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SharedAccessSignatureTokenProvider.java index fbbe0a0bbd79a..e5b412bc8a3a8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SharedAccessSignatureTokenProvider.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SharedAccessSignatureTokenProvider.java @@ -7,7 +7,6 @@ import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Duration; @@ -15,6 +14,8 @@ import java.util.Base64; import java.util.Locale; +import static java.nio.charset.StandardCharsets.UTF_8; + public class SharedAccessSignatureTokenProvider { final String keyName; final String sharedAccessKey; @@ -56,7 +57,7 @@ public static String generateSharedAccessSignature( throw new IllegalArgumentException("tokenTimeToLive has to positive and in the order-of seconds"); } - final String utf8Encoding = StandardCharsets.UTF_8.name(); + final String utf8Encoding = UTF_8.name(); String expiresOn = Long.toString(Instant.now().getEpochSecond() + tokenTimeToLive.getSeconds()); String audienceUri = URLEncoder.encode(resource, utf8Encoding); String secretToSign = audienceUri + "\n" + expiresOn; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java index 00f011c7e675e..58fd7d7be7730 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/Timer.java @@ -12,7 +12,7 @@ final class Timer { final SchedulerProvider schedulerProvider; - public Timer(final SchedulerProvider schedulerProvider) { + Timer(final SchedulerProvider schedulerProvider) { this.schedulerProvider = schedulerProvider; } @@ -36,7 +36,7 @@ static final class ScheduledTask extends DispatchHandler { final CompletableFuture scheduledFuture; final Runnable runnable; - public ScheduledTask(final Runnable runnable) { + ScheduledTask(final Runnable runnable) { this.runnable = runnable; this.scheduledFuture = new CompletableFuture<>(); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index 9974ac05da869..7b1a172895ff2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + public class WebSocketProxyConnectionHandler extends WebSocketConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketProxyConnectionHandler.class); private final String proxySelectorModifiedError = "ProxySelector has been modified."; @@ -146,7 +148,7 @@ private Map getAuthorizationHeader() { final String usernamePasswordPair = proxyUserName + ":" + proxyPassword; proxyAuthorizationHeader.put( "Proxy-Authorization", - "Basic " + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes())); + "Basic " + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes(UTF_8))); return proxyAuthorizationHeader; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java index 14543dd5a60cc..49f150ecb5c86 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/jproxy/ProxyNegotiationHandler.java @@ -11,9 +11,10 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; -import java.nio.charset.StandardCharsets; import java.util.Scanner; +import static java.nio.charset.StandardCharsets.UTF_8; + public class ProxyNegotiationHandler { public enum ProxyConnectionState { @@ -172,7 +173,7 @@ private void copyBytesBetweenClientAndService(final ReadWriteState readWriteStat } private String[] extractHostNamePort(final byte[] connectRequest) { - final String request = new String(connectRequest, StandardCharsets.UTF_8); + final String request = new String(connectRequest, UTF_8); final Scanner requestScanner = new Scanner(request); final String firstLine = requestScanner.nextLine();