Skip to content

Commit

Permalink
Fixes several issues in the reactor related components (Azure#411)
Browse files Browse the repository at this point in the history
This pull request contains the following changes.

1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete.
2) Fix the session open timeout issue which can result in NPE in proton-J engine.
3) Make session open timeout configurable and use the value of OperationTimeout.
4) Update the message of exceptions and include an entity name in the exception message.
5) API change - use ScheduledExecutorService.
6) Improve tracing.
  • Loading branch information
sjkwak authored Dec 21, 2018
1 parent 0b7ca03 commit d1b68d1
Show file tree
Hide file tree
Showing 36 changed files with 557 additions and 397 deletions.
4 changes: 2 additions & 2 deletions ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ For a simple event consumer, you'll need to import the *com.microsoft.azure.even
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

The receiver code creates an *EventHubClient* from a given connecting string
Expand Down
4 changes: 2 additions & 2 deletions Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ which is quite simple in a Maven build [as we explain in the guide](PublishingEv
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
So, `EventHubClient` requires an instance of `ScheduledExecutorService`, where all these tasks are run.


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

Using an Event Hub connection string, which holds all required connection information, including an authorization key or token,
Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ So, `EventHubClient` requires an instance of `Executor`, where all these tasks a


```Java
ExecutorService executor = Executors.newCachedThreadPool();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

Using an Event Hub connection string, which holds all required connection information including an authorization key or token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PartitionManager extends Closable {
private ScheduledFuture<?> scanFuture = null;

PartitionManager(HostContext hostContext) {
super(null);
super(null);
this.hostContext = hostContext;
}

Expand All @@ -41,17 +41,17 @@ CompletableFuture<Void> cachePartitionIds() {
// EventHubException or IOException, in addition to whatever failures may occur when the result of
// the CompletableFuture is evaluated.
try {
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();
final CompletableFuture<Void> cleanupFuture = new CompletableFuture<Void>();

// Stage 0A: get EventHubClient for the event hub
retval = EventHubClient.create(this.hostContext.getEventHubConnectionString(), this.hostContext.getRetryPolicy(), this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 0B: set up a way to close the EventHubClient when we're done
.thenApplyAsync((ehClient) ->
{
final EventHubClient saveForCleanupClient = ehClient;
cleanupFuture.thenComposeAsync((empty) -> saveForCleanupClient.close(), this.hostContext.getExecutor());
return ehClient;
}, this.hostContext.getExecutor())
// Stage 1: use the client to get runtime info for the event hub
.thenComposeAsync((ehClient) -> ehClient.getRuntimeInformation(), this.hostContext.getExecutor())
// Stage 2: extract the partition ids from the runtime info or throw on null (timeout)
Expand All @@ -71,7 +71,7 @@ CompletableFuture<Void> cachePartitionIds() {
// Stage 3: RUN REGARDLESS OF EXCEPTIONS -- if there was an error, wrap it in IllegalEntityException and throw
.handleAsync((empty, e) ->
{
cleanupFuture.complete(null); // trigger client cleanup
cleanupFuture.complete(null); // trigger client cleanup
if (e != null) {
Throwable notifyWith = e;
if (e instanceof CompletionException) {
Expand Down Expand Up @@ -104,8 +104,8 @@ void onPartitionCheckCompleteTestHook() {
}

CompletableFuture<Void> stopPartitions() {
setClosing();
setClosing();

// If the lease scanner is between runs, cancel so it doesn't run again.
synchronized (this.scanFutureSynchronizer) {
if (this.scanFuture != null) {
Expand All @@ -119,20 +119,20 @@ CompletableFuture<Void> stopPartitions() {
if (this.pumpManager != null) {
TRACE_LOGGER.info(this.hostContext.withHost("Shutting down all pumps"));
stopping = this.pumpManager.removeAllPumps(CloseReason.Shutdown)
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);
.whenCompleteAsync((empty, e) -> {
if (e != null) {
Throwable notifyWith = LoggingUtils.unwrapException(e, null);
TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
if (notifyWith instanceof Exception) {
this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception) notifyWith,
EventProcessorHostActionStrings.PARTITION_MANAGER_CLEANUP);

}
}
}, this.hostContext.getExecutor());
}
}
}, this.hostContext.getExecutor());
}
// else no pumps to shut down

stopping = stopping.whenCompleteAsync((empty, e) -> {
TRACE_LOGGER.info(this.hostContext.withHost("Partition manager exiting"));
setClosed();
Expand Down Expand Up @@ -287,14 +287,14 @@ private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callab
// Return Void so it can be called from a lambda.
// throwOnFailure is true
private Void scan(boolean isFirst) {
TRACE_LOGGER.info(this.hostContext.withHost("Starting lease scan"));
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) ->
{
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
Expand All @@ -305,11 +305,11 @@ private Void scan(boolean isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ protected void scheduleLeaseRenewer() {
if (!getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> leaseRenewer(), seconds, TimeUnit.SECONDS);
TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@

import org.junit.Assume;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

final class TestUtilities {
static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);

static void skipIfAppveyor() {
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
String appveyor = System.getenv("APPVEYOR"); // Set to "true" by Appveyor
if (appveyor != null) {
TestBase.logInfo("SKIPPING - APPVEYOR DETECTED");
}
Assume.assumeTrue(appveyor == null);
}

static String getStorageConnectionString() {
TestUtilities.skipIfAppveyor();
TestUtilities.skipIfAppveyor();

String retval = System.getenv("EPHTESTSTORAGE");

// if EPHTESTSTORAGE is not set - we cannot run integration tests
if (retval == null) {
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
TestBase.logInfo("SKIPPING - NO STORAGE CONNECTION STRING");
}
Assume.assumeTrue(retval != null);

return ((retval != null) ? retval : "");
}

static Boolean isRunningOnAzure() {
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
return (System.getenv("EVENT_HUB_CONNECTION_STRING") != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import com.microsoft.azure.eventhubs.EventHubException;
import org.apache.logging.log4j.core.appender.AbstractManager;

import java.io.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public final class EventHubsManager extends AbstractManager {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);
private final String eventHubConnectionString;
private EventHubClient eventHubSender;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
*/
package com.microsoft.azure.eventhubs;

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
* Authorization failed exception is thrown when error is encountered during authorizing user's permission to run the intended operations.
* When encountered this exception user should check whether the token/key provided in the connection string (e.g. one passed to
* {@link EventHubClient#create(String, Executor)}) is valid, and has correct execution right for the intended operations (e.g.
* {@link EventHubClient#create(String, ScheduledExecutorService)}) is valid, and has correct execution right for the intended operations (e.g.
* Receive call will need Listen claim associated with the key/token).
*
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">http://go.microsoft.com/fwlink/?LinkId=761101</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
* The data structure encapsulating the Event being sent-to and received-from EventHubs.
Expand Down Expand Up @@ -48,7 +48,7 @@ public interface EventData extends Serializable {
*
* @param data the actual payload of data in bytes to be Sent to EventHubs.
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final byte[] data) {
return new EventDataImpl(data);
Expand All @@ -72,7 +72,7 @@ static EventData create(final byte[] data) {
* @param offset Offset in the byte[] to read from ; inclusive index
* @param length length of the byte[] to be read, starting from offset
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final byte[] data, final int offset, final int length) {
return new EventDataImpl(data, offset, length);
Expand All @@ -94,7 +94,7 @@ static EventData create(final byte[] data, final int offset, final int length) {
*
* @param buffer ByteBuffer which references the payload of the Event to be sent to EventHubs
* @return EventData the created {@link EventData} to send to EventHubs.
* @see EventHubClient#create(String, Executor)
* @see EventHubClient#create(String, ScheduledExecutorService)
*/
static EventData create(final ByteBuffer buffer) {
return new EventDataImpl(buffer);
Expand Down
Loading

0 comments on commit d1b68d1

Please sign in to comment.