Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHubs integration part 1 #3100

Merged
merged 37 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a0a9256
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
mssfang Mar 7, 2019
448eb87
eventhubs-java: pom and code changes
mssfang Mar 5, 2019
81bf995
eventhubs-java: client.test.live.yml updated
mssfang Mar 5, 2019
cbdcbc4
LicenseJava: add license info for all java files
mssfang Mar 6, 2019
63202b8
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
mssfang Mar 8, 2019
2b77b7e
PR-3067: Eventhub-java: integrate event hubs into azure-sdk-for-java …
mssfang Mar 8, 2019
b3f9025
fix: update jproxy link for unit tests
mssfang Mar 11, 2019
7e62e12
Merge branch 'master' into eventhub-after-init
mssfang Mar 11, 2019
6205425
fix<test-proxy-server>: add external proxy server lib source file for…
mssfang Mar 11, 2019
d428a0b
fix<checkstyle>: fixed all indentation errors
mssfang Mar 13, 2019
7ca54ae
fix<checkstyle>: FileTabCharacter rule fixed
mssfang Mar 13, 2019
77f60ad
fix<checkstyle>: fixed OperatorWrap rule
mssfang Mar 13, 2019
6af1c75
fix<checkStyle>: fixed AvoidStarImport rule
mssfang Mar 13, 2019
2f659da
fix<checkstyle>: WhitespaceAround rule
mssfang Mar 13, 2019
d27ff38
fix<checkstyle>: NoWhitespaceBefore and WhitespaceAfter rules
mssfang Mar 13, 2019
d74dd65
fix<checkstyle>: WhitespaceAround rule
mssfang Mar 13, 2019
370ac79
fix<checkstyle>: RegexpSingleline rule
mssfang Mar 13, 2019
490ba69
fix<checkstyle>: ArrayTypeStyle rule
mssfang Mar 13, 2019
aa398e5
fix<checkstyle>: NewlineAtEndOfFile rule
mssfang Mar 13, 2019
0186bde
fix<checkstyle>: UnusedImports
mssfang Mar 13, 2019
ad9e36e
fix<checkstyle>: ConstantName
mssfang Mar 13, 2019
1bdd20c
fix<checkstyle>: MethodName and StaticVariableName rules
mssfang Mar 13, 2019
8b1c784
fix<checkstyle>: VisibilityModifier
mssfang Mar 13, 2019
7ef1d6f
fix<checkstyle>: EmptyBlock, InnerAssignment
mssfang Mar 13, 2019
175b3c0
fix<checkstyle>: ModifierOrder
mssfang Mar 13, 2019
6b62e9e
fix<checkstyle>: RedundantModifier
mssfang Mar 13, 2019
442091a
fix<checkstyle-warning>: LeftCurly
mssfang Mar 13, 2019
630824a
fix<revert>: revert client.test.live.yml changes, exclude event hub
mssfang Mar 13, 2019
efa2053
fix<checkstyle>: additional 10 Indentation errors
mssfang Mar 14, 2019
e1b698a
fix<checkstyle, pom>: updates changes for connie's code changes request
mssfang Mar 14, 2019
0071253
fix<conflict>: resolved conflict after update
mssfang Mar 14, 2019
7452fbd
test<live-test>: enable live test for eventhubs
mssfang Mar 14, 2019
939e823
Merge branch 'master' into eventhubs-checkstyle-2
mssfang Mar 20, 2019
64d6046
fix(CheckStyle): update to latest checkstyle-suppressions that matche…
mssfang Mar 20, 2019
c872da8
conflict: resolve check style conflict issue
mssfang Mar 20, 2019
be64553
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
mssfang Mar 20, 2019
750e86f
Merge branch 'eventhubs-checkstyle-2' of https://github.com/mssfang/a…
mssfang Mar 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@

<!-- Public API already released with incorrect constant variable naming -->
<suppress checks="ConstantName" files="AlgorithmResolver.java"/>
<suppress checks="ConstantName" files="AmqpErrorCode.java"/>
<suppress checks="ConstantName" files="BatchErrorCodeStrings.java"/>
<suppress checks="ConstantName" files="MessagingFactory.java"/>
<suppress checks="ConstantName" files="PartitionManagerOptions.java"/>
<suppress checks="ConstantName" files="RsaKey.java"/>
<suppress checks="ConstantName" files="SymmetricKey.java"/>
<suppress checks="ConstantName" files="BatchErrorCodeStrings.java"/>
<suppress checks="ConstantName" files="TaskFailureInformationCodes.java"/>

<!-- Public API already released with incorrect static variable naming -->
<suppress checks="StaticVariableName" files="EventHubClientImpl.java"/>

<!-- Public API already released with visibility modifier -->
<suppress checks="VisibilityModifier" files="BatchOptions.java"/>
<suppress checks="VisibilityModifier" files="EventHubClientImpl.java"/>

<!-- Public API already released without final modifier -->
<suppress checks="FinalClass" files="BatchClient.java"/>
</suppressions>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class BaseLease implements Comparable<BaseLease> {
private final String partitionId;
private String owner = "";
private transient boolean isOwned = false; // do not serialize
private transient boolean isOwned = false; // do not serialize

/**
* Do not use; added only for GSon deserializer
Expand All @@ -45,9 +45,9 @@ public BaseLease(String partitionId) {
* @param isOwned True if the lease is owned, false if not.
*/
public BaseLease(String partitionId, String owner, boolean isOwned) {
this.partitionId = partitionId;
this.owner = owner;
this.isOwned = isOwned;
this.partitionId = partitionId;
this.owner = owner;
this.isOwned = isOwned;
}

/**
Expand Down Expand Up @@ -85,17 +85,17 @@ public void setOwner(String owner) {
* @param newState true if the lease is owned, or false if it is not
*/
public void setIsOwned(boolean newState) {
this.isOwned = newState;
this.isOwned = newState;
}

/**
* Get the owned state of the lease.
*
* @return true if the lease is owned, or false if it is not
*/
public boolean getIsOwned() {
return this.isOwned;
}
/**
* Get the owned state of the lease.
*
* @return true if the lease is owned, or false if it is not
*/
public boolean getIsOwned() {
return this.isOwned;
}

/**
* Convenience function for comparing possibleOwner against this.owner
Expand All @@ -120,11 +120,11 @@ public String getPartitionId() {
return this.partitionId;
}

// Compares by partition id
@Override
public int compareTo(BaseLease other) {
return this.partitionId.compareTo(other.getPartitionId());
}
// Compares by partition id
@Override
public int compareTo(BaseLease other) {
return this.partitionId.compareTo(other.getPartitionId());
}

String getStateDebug() {
return "N/A";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class Closable {

// null parent means top-level
Closable(Closable parent) {
this.syncClose = new Object();
this.parent = parent;
this.isClosing = false;
this.isClosed = false;
this.syncClose = new Object();
this.parent = parent;
this.isClosing = false;
this.isClosed = false;
}

protected final boolean getIsClosed() {
Expand All @@ -33,29 +33,29 @@ protected final boolean getIsClosingOrClosed() {
}

protected final void setClosing() {
synchronized (this.syncClose) {
this.isClosing = true;
}
synchronized (this.syncClose) {
this.isClosing = true;
}
}

protected final void setClosed() {
synchronized (this.syncClose) {
this.isClosing = false;
this.isClosed = true;
}
synchronized (this.syncClose) {
this.isClosing = false;
this.isClosed = true;
}
}

protected final void throwIfClosingOrClosed(String message) {
if (getIsClosingOrClosed()) {
throw new ClosingException(message);
}
if (getIsClosingOrClosed()) {
throw new ClosingException(message);
}
}

class ClosingException extends RuntimeException {
private static final long serialVersionUID = 1138985585921317036L;
private static final long serialVersionUID = 1138985585921317036L;

ClosingException(String message) {
super(message);
}
ClosingException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class CompleteLease extends BaseLease {
* Do not use; added only for GSon deserializer
*/
protected CompleteLease() {
super();
super();
}

/**
Expand All @@ -31,7 +31,7 @@ protected CompleteLease() {
* @param partitionId Partition id for this lease.
*/
public CompleteLease(String partitionId) {
super(partitionId);
super(partitionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/***
Expand Down Expand Up @@ -500,8 +507,7 @@ public CompletableFuture<Void> unregisterEventProcessor() {
// If we own the executor, stop it also.
// Owned executor is also created in constructor.
if (this.weOwnExecutor) {
this.unregistered = this.unregistered.thenRunAsync(() ->
{
this.unregistered = this.unregistered.thenRunAsync(() -> {
// IMPORTANT: run this last stage in the default threadpool!
// If a task running in a threadpool waits for that threadpool to terminate, it's going to wait a long time...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@
* They describe what activity was taking place when the exception occurred.
*/
public final class EventProcessorHostActionStrings {
public final static String ACQUIRING_LEASE = "Acquiring Lease";
public final static String CHECKING_CHECKPOINT_STORE = "Checking Checpoint Store Existence";
public final static String CHECKING_LEASES = "Checking Leases";
public final static String CHECKING_LEASE_STORE = "Checking Lease Store Existence";
public final static String CLOSING_EVENT_PROCESSOR = "Closing Event Processor";
public final static String CREATING_CHECKPOINTS = "Creating Checkpoint Holders";
public final static String CREATING_CHECKPOINT_STORE = "Creating Checkpoint Store";
public final static String CREATING_EVENT_HUB_CLIENT = "Creating Event Hub Client";
public final static String CREATING_EVENT_PROCESSOR = "Creating Event Processor";
public final static String CREATING_LEASES = "Creating Leases";
public final static String CREATING_LEASE_STORE = "Creating Lease Store";
public final static String DELETING_LEASE = "Deleting Lease";
public final static String GETTING_CHECKPOINT = "Getting Checkpoint Details";
public final static String GETTING_LEASE = "Getting Lease Details";
public final static String INITIALIZING_STORES = "Initializing Stores";
public final static String OPENING_EVENT_PROCESSOR = "Opening Event Processor";
public final static String PARTITION_MANAGER_CLEANUP = "Partition Manager Cleanup";
public final static String PARTITION_MANAGER_MAIN_LOOP = "Partition Manager Main Loop";
public final static String RELEASING_LEASE = "Releasing Lease";
public final static String RENEWING_LEASE = "Renewing Lease";
public final static String STEALING_LEASE = "Stealing Lease";
public final static String UPDATING_CHECKPOINT = "Updating Checkpoint";
public final static String UPDATING_LEASE = "Updating Lease";
public static final String ACQUIRING_LEASE = "Acquiring Lease";
public static final String CHECKING_CHECKPOINT_STORE = "Checking Checpoint Store Existence";
public static final String CHECKING_LEASES = "Checking Leases";
public static final String CHECKING_LEASE_STORE = "Checking Lease Store Existence";
public static final String CLOSING_EVENT_PROCESSOR = "Closing Event Processor";
public static final String CREATING_CHECKPOINTS = "Creating Checkpoint Holders";
public static final String CREATING_CHECKPOINT_STORE = "Creating Checkpoint Store";
public static final String CREATING_EVENT_HUB_CLIENT = "Creating Event Hub Client";
public static final String CREATING_EVENT_PROCESSOR = "Creating Event Processor";
public static final String CREATING_LEASES = "Creating Leases";
public static final String CREATING_LEASE_STORE = "Creating Lease Store";
public static final String DELETING_LEASE = "Deleting Lease";
public static final String GETTING_CHECKPOINT = "Getting Checkpoint Details";
public static final String GETTING_LEASE = "Getting Lease Details";
public static final String INITIALIZING_STORES = "Initializing Stores";
public static final String OPENING_EVENT_PROCESSOR = "Opening Event Processor";
public static final String PARTITION_MANAGER_CLEANUP = "Partition Manager Cleanup";
public static final String PARTITION_MANAGER_MAIN_LOOP = "Partition Manager Main Loop";
public static final String RELEASING_LEASE = "Releasing Lease";
public static final String RENEWING_LEASE = "Renewing Lease";
public static final String STEALING_LEASE = "Stealing Lease";
public static final String UPDATING_CHECKPOINT = "Updating Checkpoint";
public static final String UPDATING_LEASE = "Updating Lease";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
import java.util.concurrent.ScheduledExecutorService;

final class HostContext {
final private ScheduledExecutorService executor;
private final ScheduledExecutorService executor;

// Ideally we wouldn't need the host, but there are certain things which can be dynamically changed
// by the user via APIs on the host and which need to be exposed on the HostContext. Passing the
// call through is easier and safer than trying to keep two copies in sync.
final private EventProcessorHost host;
final private String hostName;
private final EventProcessorHost host;
private final String hostName;

final private String eventHubPath;
final private String consumerGroupName;
final private String eventHubConnectionString;
final private RetryPolicy retryPolicy;
private final String eventHubPath;
private final String consumerGroupName;
private final String eventHubConnectionString;
private final RetryPolicy retryPolicy;

final private ILeaseManager leaseManager;
final private ICheckpointManager checkpointManager;
private final ILeaseManager leaseManager;
private final ICheckpointManager checkpointManager;

// Cannot be final because it is not available at HostContext construction time.
private EventProcessorOptions eventProcessorOptions = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public interface ICheckpointManager {
public CompletableFuture<Checkpoint> getCheckpoint(String partitionId);

/***
* Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs
* that already exist.
* Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs
* that already exist.
*
* The semantics of this are complicated because it is possible to use the same store for both
* leases and checkpoints (the Azure Storage implementation does so) and it is required to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {

@Override
public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds) {
for (String id : partitionIds) {
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id);
if (checkpointInStore != null) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() found existing checkpoint, OK"));
} else {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() creating new checkpoint"));
Checkpoint newStoreCheckpoint = new Checkpoint(id);
// This API actually creates the holder, not the checkpoint itself. In this implementation, we do create a Checkpoint object
// and put it in the store, but the values are set to indicate that it is not initialized.
newStoreCheckpoint.setOffset(null);
newStoreCheckpoint.setSequenceNumber(-1);
InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint);
}
}
for (String id : partitionIds) {
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id);
if (checkpointInStore != null) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() found existing checkpoint, OK"));
} else {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() creating new checkpoint"));
Checkpoint newStoreCheckpoint = new Checkpoint(id);
// This API actually creates the holder, not the checkpoint itself. In this implementation, we do create a Checkpoint object
// and put it in the store, but the values are set to indicate that it is not initialized.
newStoreCheckpoint.setOffset(null);
newStoreCheckpoint.setSequenceNumber(-1);
InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint);
}
}
return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public CompletableFuture<Void> deleteCheckpoint(String partitionId) {


private static class InMemoryCheckpointStore {
final static InMemoryCheckpointStore singleton = new InMemoryCheckpointStore();
static final InMemoryCheckpointStore singleton = new InMemoryCheckpointStore();

private ConcurrentHashMap<String, Checkpoint> inMemoryCheckpointsPrivate = null;

Expand Down
Loading