Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 48: EPH tracing #60

Merged
merged 1 commit into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private Void updateCheckpointSync(Checkpoint checkpoint) throws Exception
{
// Need to fetch the most current lease data so that we can update it correctly.
AzureBlobLease lease = getLeaseSync(checkpoint.getPartitionId());
this.host.logWithHostAndPartition(Level.FINE, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber());
this.host.logWithHostAndPartition(Level.FINER, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber());
lease.setOffset(checkpoint.getOffset());
lease.setSequenceNumber(checkpoint.getSequenceNumber());
updateLeaseSync(lease);
Expand All @@ -200,7 +200,7 @@ private Void deleteCheckpointSync(String partitionId) throws Exception
{
// "Delete" a checkpoint by changing the offset to null, so first we need to fetch the most current lease
AzureBlobLease lease = getLeaseSync(partitionId);
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Deleting checkpoint for " + partitionId);
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Deleting checkpoint for " + partitionId);
lease.setOffset(null);
lease.setSequenceNumber(0L);
updateLeaseSync(lease);
Expand Down Expand Up @@ -334,7 +334,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI
{
CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
returnLease = new AzureBlobLease(partitionId, leaseBlob);
this.host.logWithHostAndPartition(Level.INFO, partitionId,
this.host.logWithHostAndPartition(Level.FINE, partitionId,
"CreateLeaseIfNotExist - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() +
"storageBlobPrefix: " + this.storageBlobPrefix);
uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition("*"), UploadActivity.Create);
Expand All @@ -347,7 +347,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI
(extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.LEASE_ID_MISSING) == 0))) // occurs when somebody else already has leased the blob
{
// The blob already exists.
this.host.logWithHostAndPartition(Level.INFO, partitionId, "Lease already exists");
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Lease already exists");
returnLease = getLeaseSync(partitionId);
}
else
Expand All @@ -371,7 +371,7 @@ public Future<Void> deleteLease(Lease lease)

private Void deleteLeaseSync(AzureBlobLease lease) throws StorageException
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Deleting lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Deleting lease");
lease.getBlob().deleteIfExists();
return null;
}
Expand All @@ -384,7 +384,7 @@ public Future<Boolean> acquireLease(Lease lease)

private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Acquiring lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand All @@ -399,12 +399,12 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception
leaseBlob.downloadAttributes();
if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED)
{
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "changeLease");
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease");
newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
}
else
{
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "acquireLease");
this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease");
newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId);
}
lease.setToken(newToken);
Expand Down Expand Up @@ -435,7 +435,7 @@ public Future<Boolean> renewLease(Lease lease)

private Boolean renewLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Renewing lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Renewing lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand Down Expand Up @@ -467,7 +467,7 @@ public Future<Boolean> releaseLease(Lease lease)

private Boolean releaseLeaseSync(AzureBlobLease lease) throws Exception
{
this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Releasing lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Releasing lease");

CloudBlockBlob leaseBlob = lease.getBlob();
boolean retval = true;
Expand Down Expand Up @@ -508,7 +508,7 @@ public Boolean updateLeaseSync(AzureBlobLease lease) throws Exception
return false;
}

this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Updating lease");
this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Updating lease");

String token = lease.getToken();
if ((token == null) || (token.length() == 0))
Expand Down Expand Up @@ -590,11 +590,11 @@ else if (lease.getOffset() != null)
private boolean wasLeaseLost(StorageException se, String partitionId)
{
boolean retval = false;
this.host.logWithHostAndPartition(Level.FINE, partitionId, "WAS LEASE LOST?");
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getHttpStatusCode());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "WAS LEASE LOST?");
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getHttpStatusCode());
if (se.getExtendedErrorInformation() != null)
{
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage());
}
if ((se.getHttpStatusCode() == 409) || // conflict
(se.getHttpStatusCode() == 412)) // precondition failed
Expand All @@ -603,8 +603,8 @@ private boolean wasLeaseLost(StorageException se, String partitionId)
if (extendedErrorInfo != null)
{
String errorCode = extendedErrorInfo.getErrorCode();
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error code: " + errorCode);
this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage());
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error code: " + errorCode);
this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage());
if ((errorCode.compareTo(StorageErrorCodeStrings.LEASE_LOST) == 0) ||
(errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_LEASE_OPERATION) == 0) ||
(errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_BLOB_OPERATION) == 0) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ void specializedStartPump()
private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException
{
// Create new client
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH client");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH client");
this.internalOperationFuture = EventHubClient.createFromConnectionString(this.host.getEventHubConnectionString());
this.eventHubClient = (EventHubClient) this.internalOperationFuture.get();
this.internalOperationFuture = null;

// Create new receiver and set options
Object startAt = this.partitionContext.getInitialOffset();
long epoch = this.lease.getEpoch();
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
if (startAt instanceof String)
{
this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(),
Expand Down Expand Up @@ -130,7 +130,7 @@ else if (startAt instanceof Instant)
this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
this.internalOperationFuture = null;

this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "EH client and receiver creation finished");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "EH client and receiver creation finished");
}

private void cleanUpClients() // swallows all exceptions
Expand All @@ -146,14 +146,14 @@ private void cleanUpClients() // swallows all exceptions
this.partitionReceiver.setReceiveHandler(null);
}

this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH receiver");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH receiver");
this.partitionReceiver.close();
this.partitionReceiver = null;
}

if (this.eventHubClient != null)
{
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH client");
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH client");
this.eventHubClient.close();
this.eventHubClient = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ public EventProcessorHost(
ILeaseManager leaseManager,
ExecutorService executorService)
{
EventProcessorHost.TRACE_LOGGER.setLevel(Level.SEVERE);

if ((hostName == null) || hostName.isEmpty())
{
throw new IllegalArgumentException("hostName argument must not be null or empty string");
Expand Down Expand Up @@ -376,7 +374,7 @@ public EventProcessorHost(

this.partitionManager = new PartitionManager(this);

logWithHost(Level.INFO, "New EventProcessorHost created");
logWithHost(Level.FINE, "New EventProcessorHost created");
}

/**
Expand Down Expand Up @@ -505,7 +503,7 @@ public Future<?> registerEventProcessorFactory(IEventProcessorFactory<?> factory
}
}

logWithHost(Level.INFO, "Starting event processing");
logWithHost(Level.FINE, "Starting event processing");
this.processorFactory = factory;
this.processorOptions = processorOptions;
return EventProcessorHost.executorService.submit(() -> this.partitionManager.initialize());
Expand All @@ -519,7 +517,7 @@ public Future<?> registerEventProcessorFactory(IEventProcessorFactory<?> factory
*/
public void unregisterEventProcessor() throws InterruptedException, ExecutionException
{
logWithHost(Level.INFO, "Stopping event processing");
logWithHost(Level.FINE, "Stopping event processing");

if (this.partitionManager != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,18 @@ Object getInitialOffset() throws InterruptedException, ExecutionException
{
// No checkpoint was ever stored. Use the initialOffsetProvider instead.
Function<String, Object> initialOffsetProvider = this.host.getEventProcessorOptions().getInitialOffsetProvider();
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Calling user-provided initial offset provider");
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Calling user-provided initial offset provider");
startAt = initialOffsetProvider.apply(this.partitionId);
if (startAt instanceof String)
{
this.offset = (String)startAt;
this.sequenceNumber = 0; // TODO we use sequenceNumber to check for regression of offset, 0 could be a problem until it gets updated from an event
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber);
}
else if (startAt instanceof Instant)
{
// can't set offset/sequenceNumber
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial timestamp provided: " + (Instant)startAt);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial timestamp provided: " + (Instant)startAt);
}
else
{
Expand All @@ -124,7 +124,7 @@ else if (startAt instanceof Instant)
this.offset = startingCheckpoint.getOffset();
startAt = this.offset;
this.sequenceNumber = startingCheckpoint.getSequenceNumber();
this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber);
this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber);
}

return startAt;
Expand Down Expand Up @@ -169,7 +169,7 @@ public void checkpoint(EventData event) throws IllegalArgumentException, Interru

private void persistCheckpoint(Checkpoint persistThis) throws IllegalArgumentException, InterruptedException, ExecutionException
{
this.host.logWithHostAndPartition(Level.FINE, persistThis.getPartitionId(), "Saving checkpoint: " +
this.host.logWithHostAndPartition(Level.FINER, persistThis.getPartitionId(), "Saving checkpoint: " +
persistThis.getOffset() + "//" + persistThis.getSequenceNumber());

Checkpoint inStoreCheckpoint = this.host.getCheckpointManager().getCheckpoint(persistThis.getPartitionId()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ Iterable<String> getPartitionIds() throws IllegalEntityException
throw new EPHConfigurationException(errorMessage, exception);
}

this.host.logWithHost(Level.INFO, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size());
this.host.logWithHost(Level.FINE, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size());
for (String id : this.partitionIds)
{
this.host.logWithHost(Level.FINE, "Found partition with id: " + id);
this.host.logWithHost(Level.FINER, "Found partition with id: " + id);
}
}

Expand Down Expand Up @@ -183,7 +183,7 @@ private Void runAndCleanUp()
try
{
runLoop();
this.host.logWithHost(Level.INFO, "Partition manager main loop exited normally, shutting down");
this.host.logWithHost(Level.FINE, "Partition manager main loop exited normally, shutting down");
}
catch (ExceptionWithAction e)
{
Expand Down Expand Up @@ -213,7 +213,7 @@ private Void runAndCleanUp()
}

// Cleanup
this.host.logWithHost(Level.INFO, "Shutting down all pumps");
this.host.logWithHost(Level.FINE, "Shutting down all pumps");
Iterable<Future<?>> pumpRemovals = this.pump.removeAllPumps(CloseReason.Shutdown);

// All of the shutdown threads have been launched, we can shut down the executor now.
Expand Down Expand Up @@ -244,7 +244,7 @@ private Void runAndCleanUp()
}
}

this.host.logWithHost(Level.INFO, "Partition manager exiting");
this.host.logWithHost(Level.FINE, "Partition manager exiting");

return null;
}
Expand Down Expand Up @@ -400,7 +400,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0)
{
if (leaseManager.acquireLease(stealee).get())
{
this.host.logWithHostAndPartition(Level.INFO, stealee.getPartitionId(), "Stole lease");
this.host.logWithHostAndPartition(Level.FINE, stealee.getPartitionId(), "Stole lease");
allLeases.put(stealee.getPartitionId(), stealee);
ourLeasesCount++;
}
Expand All @@ -423,7 +423,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0)
for (String partitionId : allLeases.keySet())
{
Lease updatedLease = allLeases.get(partitionId);
this.host.logWithHost(Level.FINE, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG
this.host.logWithHost(Level.FINER, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG
if (updatedLease.getOwner().compareTo(this.host.getHostName()) == 0)
{
this.pump.addPump(partitionId, updatedLease);
Expand Down Expand Up @@ -489,7 +489,7 @@ private Iterable<Lease> whichLeasesToSteal(ArrayList<Lease> stealableLeases, int
if (l.getOwner().compareTo(biggestOwner) == 0)
{
stealTheseLeases.add(l);
this.host.logWithHost(Level.FINE, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner);
this.host.logWithHost(Level.FINER, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner);
break;
}
}
Expand Down Expand Up @@ -529,9 +529,9 @@ private HashMap<String, Integer> countLeasesByOwner(Iterable<Lease> leases)
}
for (String owner : counts.keySet())
{
this.host.log(Level.FINE, "host " + owner + " owns " + counts.get(owner) + " leases");
this.host.log(Level.FINER, "host " + owner + " owns " + counts.get(owner) + " leases");
}
this.host.log(Level.FINE, "total hosts in sorted list: " + counts.size());
this.host.log(Level.FINER, "total hosts in sorted list: " + counts.size());

return counts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void shutdown(CloseReason reason)
}
this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
}
this.host.logWithHostAndPartition(Level.INFO, this.partitionContext, "pump shutdown for reason " + reason.toString());
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "pump shutdown for reason " + reason.toString());

specializedShutdown(reason);

Expand Down Expand Up @@ -157,7 +157,7 @@ protected void onEvents(Iterable<EventData> events)
}
if (last != null)
{
this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Updating offset in partition context with end of batch " +
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Updating offset in partition context with end of batch " +
last.getSystemProperties().getOffset() + "//" + last.getSystemProperties().getSequenceNumber());
this.partitionContext.setOffsetAndSequenceNumber(last);
}
Expand Down
Loading