diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java index 4b454a315..f3ca4780c 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java @@ -183,7 +183,7 @@ private Void updateCheckpointSync(Checkpoint checkpoint) throws Exception { // Need to fetch the most current lease data so that we can update it correctly. AzureBlobLease lease = getLeaseSync(checkpoint.getPartitionId()); - this.host.logWithHostAndPartition(Level.FINE, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber()); + this.host.logWithHostAndPartition(Level.FINER, checkpoint.getPartitionId(), "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber()); lease.setOffset(checkpoint.getOffset()); lease.setSequenceNumber(checkpoint.getSequenceNumber()); updateLeaseSync(lease); @@ -200,7 +200,7 @@ private Void deleteCheckpointSync(String partitionId) throws Exception { // "Delete" a checkpoint by changing the offset to null, so first we need to fetch the most current lease AzureBlobLease lease = getLeaseSync(partitionId); - this.host.logWithHostAndPartition(Level.FINE, partitionId, "Deleting checkpoint for " + partitionId); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "Deleting checkpoint for " + partitionId); lease.setOffset(null); lease.setSequenceNumber(0L); updateLeaseSync(lease); @@ -334,7 +334,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI { CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId); returnLease = new AzureBlobLease(partitionId, leaseBlob); - this.host.logWithHostAndPartition(Level.INFO, partitionId, + this.host.logWithHostAndPartition(Level.FINE, partitionId, "CreateLeaseIfNotExist - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.host.getConsumerGroupName() + "storageBlobPrefix: " + this.storageBlobPrefix); uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition("*"), UploadActivity.Create); @@ -347,7 +347,7 @@ private AzureBlobLease createLeaseIfNotExistsSync(String partitionId) throws URI (extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.LEASE_ID_MISSING) == 0))) // occurs when somebody else already has leased the blob { // The blob already exists. - this.host.logWithHostAndPartition(Level.INFO, partitionId, "Lease already exists"); + this.host.logWithHostAndPartition(Level.FINE, partitionId, "Lease already exists"); returnLease = getLeaseSync(partitionId); } else @@ -371,7 +371,7 @@ public Future deleteLease(Lease lease) private Void deleteLeaseSync(AzureBlobLease lease) throws StorageException { - this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Deleting lease"); + this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Deleting lease"); lease.getBlob().deleteIfExists(); return null; } @@ -384,7 +384,7 @@ public Future acquireLease(Lease lease) private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception { - this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Acquiring lease"); + this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Acquiring lease"); CloudBlockBlob leaseBlob = lease.getBlob(); boolean retval = true; @@ -399,12 +399,12 @@ private Boolean acquireLeaseSync(AzureBlobLease lease) throws Exception leaseBlob.downloadAttributes(); if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED) { - this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "changeLease"); + this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "changeLease"); newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken())); } else { - this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "acquireLease"); + this.host.logWithHostAndPartition(Level.FINER, lease.getPartitionId(), "acquireLease"); newToken = leaseBlob.acquireLease(AzureStorageCheckpointLeaseManager.leaseDurationInSeconds, newLeaseId); } lease.setToken(newToken); @@ -435,7 +435,7 @@ public Future renewLease(Lease lease) private Boolean renewLeaseSync(AzureBlobLease lease) throws Exception { - this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Renewing lease"); + this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Renewing lease"); CloudBlockBlob leaseBlob = lease.getBlob(); boolean retval = true; @@ -467,7 +467,7 @@ public Future releaseLease(Lease lease) private Boolean releaseLeaseSync(AzureBlobLease lease) throws Exception { - this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Releasing lease"); + this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Releasing lease"); CloudBlockBlob leaseBlob = lease.getBlob(); boolean retval = true; @@ -508,7 +508,7 @@ public Boolean updateLeaseSync(AzureBlobLease lease) throws Exception return false; } - this.host.logWithHostAndPartition(Level.INFO, lease.getPartitionId(), "Updating lease"); + this.host.logWithHostAndPartition(Level.FINE, lease.getPartitionId(), "Updating lease"); String token = lease.getToken(); if ((token == null) || (token.length() == 0)) @@ -590,11 +590,11 @@ else if (lease.getOffset() != null) private boolean wasLeaseLost(StorageException se, String partitionId) { boolean retval = false; - this.host.logWithHostAndPartition(Level.FINE, partitionId, "WAS LEASE LOST?"); - this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getHttpStatusCode()); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "WAS LEASE LOST?"); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getHttpStatusCode()); if (se.getExtendedErrorInformation() != null) { - this.host.logWithHostAndPartition(Level.FINE, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage()); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage()); } if ((se.getHttpStatusCode() == 409) || // conflict (se.getHttpStatusCode() == 412)) // precondition failed @@ -603,8 +603,8 @@ private boolean wasLeaseLost(StorageException se, String partitionId) if (extendedErrorInfo != null) { String errorCode = extendedErrorInfo.getErrorCode(); - this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error code: " + errorCode); - this.host.logWithHostAndPartition(Level.FINE, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage()); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error code: " + errorCode); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "Error message: " + extendedErrorInfo.getErrorMessage()); if ((errorCode.compareTo(StorageErrorCodeStrings.LEASE_LOST) == 0) || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_LEASE_OPERATION) == 0) || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_BLOB_OPERATION) == 0) || diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java index 138460702..920d02628 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java @@ -91,7 +91,7 @@ void specializedStartPump() private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException { // Create new client - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH client"); + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH client"); this.internalOperationFuture = EventHubClient.createFromConnectionString(this.host.getEventHubConnectionString()); this.eventHubClient = (EventHubClient) this.internalOperationFuture.get(); this.internalOperationFuture = null; @@ -99,7 +99,7 @@ private void openClients() throws ServiceBusException, IOException, InterruptedE // Create new receiver and set options Object startAt = this.partitionContext.getInitialOffset(); long epoch = this.lease.getEpoch(); - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt); + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt); if (startAt instanceof String) { this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), @@ -130,7 +130,7 @@ else if (startAt instanceof Instant) this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut()); this.internalOperationFuture = null; - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "EH client and receiver creation finished"); + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "EH client and receiver creation finished"); } private void cleanUpClients() // swallows all exceptions @@ -146,14 +146,14 @@ private void cleanUpClients() // swallows all exceptions this.partitionReceiver.setReceiveHandler(null); } - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH receiver"); + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH receiver"); this.partitionReceiver.close(); this.partitionReceiver = null; } if (this.eventHubClient != null) { - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Closing EH client"); + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH client"); this.eventHubClient.close(); this.eventHubClient = null; } diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java index 379a80a5e..50cf18da8 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java @@ -268,8 +268,6 @@ public EventProcessorHost( ILeaseManager leaseManager, ExecutorService executorService) { - EventProcessorHost.TRACE_LOGGER.setLevel(Level.SEVERE); - if ((hostName == null) || hostName.isEmpty()) { throw new IllegalArgumentException("hostName argument must not be null or empty string"); @@ -376,7 +374,7 @@ public EventProcessorHost( this.partitionManager = new PartitionManager(this); - logWithHost(Level.INFO, "New EventProcessorHost created"); + logWithHost(Level.FINE, "New EventProcessorHost created"); } /** @@ -505,7 +503,7 @@ public Future registerEventProcessorFactory(IEventProcessorFactory factory } } - logWithHost(Level.INFO, "Starting event processing"); + logWithHost(Level.FINE, "Starting event processing"); this.processorFactory = factory; this.processorOptions = processorOptions; return EventProcessorHost.executorService.submit(() -> this.partitionManager.initialize()); @@ -519,7 +517,7 @@ public Future registerEventProcessorFactory(IEventProcessorFactory factory */ public void unregisterEventProcessor() throws InterruptedException, ExecutionException { - logWithHost(Level.INFO, "Stopping event processing"); + logWithHost(Level.FINE, "Stopping event processing"); if (this.partitionManager != null) { diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java index 641ed395d..e56bf7d68 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java @@ -100,18 +100,18 @@ Object getInitialOffset() throws InterruptedException, ExecutionException { // No checkpoint was ever stored. Use the initialOffsetProvider instead. Function initialOffsetProvider = this.host.getEventProcessorOptions().getInitialOffsetProvider(); - this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Calling user-provided initial offset provider"); + this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Calling user-provided initial offset provider"); startAt = initialOffsetProvider.apply(this.partitionId); if (startAt instanceof String) { this.offset = (String)startAt; this.sequenceNumber = 0; // TODO we use sequenceNumber to check for regression of offset, 0 could be a problem until it gets updated from an event - this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber); + this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial offset provided: " + this.offset + "//" + this.sequenceNumber); } else if (startAt instanceof Instant) { // can't set offset/sequenceNumber - this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Initial timestamp provided: " + (Instant)startAt); + this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Initial timestamp provided: " + (Instant)startAt); } else { @@ -124,7 +124,7 @@ else if (startAt instanceof Instant) this.offset = startingCheckpoint.getOffset(); startAt = this.offset; this.sequenceNumber = startingCheckpoint.getSequenceNumber(); - this.host.logWithHostAndPartition(Level.FINE, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber); + this.host.logWithHostAndPartition(Level.FINER, this.partitionId, "Retrieved starting offset " + this.offset + "//" + this.sequenceNumber); } return startAt; @@ -169,7 +169,7 @@ public void checkpoint(EventData event) throws IllegalArgumentException, Interru private void persistCheckpoint(Checkpoint persistThis) throws IllegalArgumentException, InterruptedException, ExecutionException { - this.host.logWithHostAndPartition(Level.FINE, persistThis.getPartitionId(), "Saving checkpoint: " + + this.host.logWithHostAndPartition(Level.FINER, persistThis.getPartitionId(), "Saving checkpoint: " + persistThis.getOffset() + "//" + persistThis.getSequenceNumber()); Checkpoint inStoreCheckpoint = this.host.getCheckpointManager().getCheckpoint(persistThis.getPartitionId()).get(); diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java index cf476c49a..cb02db8c2 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java @@ -119,10 +119,10 @@ Iterable getPartitionIds() throws IllegalEntityException throw new EPHConfigurationException(errorMessage, exception); } - this.host.logWithHost(Level.INFO, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size()); + this.host.logWithHost(Level.FINE, "Eventhub " + this.host.getEventHubPath() + " count of partitions: " + this.partitionIds.size()); for (String id : this.partitionIds) { - this.host.logWithHost(Level.FINE, "Found partition with id: " + id); + this.host.logWithHost(Level.FINER, "Found partition with id: " + id); } } @@ -183,7 +183,7 @@ private Void runAndCleanUp() try { runLoop(); - this.host.logWithHost(Level.INFO, "Partition manager main loop exited normally, shutting down"); + this.host.logWithHost(Level.FINE, "Partition manager main loop exited normally, shutting down"); } catch (ExceptionWithAction e) { @@ -213,7 +213,7 @@ private Void runAndCleanUp() } // Cleanup - this.host.logWithHost(Level.INFO, "Shutting down all pumps"); + this.host.logWithHost(Level.FINE, "Shutting down all pumps"); Iterable> pumpRemovals = this.pump.removeAllPumps(CloseReason.Shutdown); // All of the shutdown threads have been launched, we can shut down the executor now. @@ -244,7 +244,7 @@ private Void runAndCleanUp() } } - this.host.logWithHost(Level.INFO, "Partition manager exiting"); + this.host.logWithHost(Level.FINE, "Partition manager exiting"); return null; } @@ -400,7 +400,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0) { if (leaseManager.acquireLease(stealee).get()) { - this.host.logWithHostAndPartition(Level.INFO, stealee.getPartitionId(), "Stole lease"); + this.host.logWithHostAndPartition(Level.FINE, stealee.getPartitionId(), "Stole lease"); allLeases.put(stealee.getPartitionId(), stealee); ourLeasesCount++; } @@ -423,7 +423,7 @@ else if (possibleLease.getOwner().compareTo(this.host.getHostName()) == 0) for (String partitionId : allLeases.keySet()) { Lease updatedLease = allLeases.get(partitionId); - this.host.logWithHost(Level.FINE, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG + this.host.logWithHost(Level.FINER, "Lease on partition " + updatedLease.getPartitionId() + " owned by " + updatedLease.getOwner()); // DEBUG if (updatedLease.getOwner().compareTo(this.host.getHostName()) == 0) { this.pump.addPump(partitionId, updatedLease); @@ -489,7 +489,7 @@ private Iterable whichLeasesToSteal(ArrayList stealableLeases, int if (l.getOwner().compareTo(biggestOwner) == 0) { stealTheseLeases.add(l); - this.host.logWithHost(Level.FINE, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner); + this.host.logWithHost(Level.FINER, "Proposed to steal lease for partition " + l.getPartitionId() + " from " + biggestOwner); break; } } @@ -529,9 +529,9 @@ private HashMap countLeasesByOwner(Iterable leases) } for (String owner : counts.keySet()) { - this.host.log(Level.FINE, "host " + owner + " owns " + counts.get(owner) + " leases"); + this.host.log(Level.FINER, "host " + owner + " owns " + counts.get(owner) + " leases"); } - this.host.log(Level.FINE, "total hosts in sorted list: " + counts.size()); + this.host.log(Level.FINER, "total hosts in sorted list: " + counts.size()); return counts; } diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java index da3d00cba..f2659c817 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java @@ -98,7 +98,7 @@ void shutdown(CloseReason reason) } this.pumpStatus = PartitionPumpStatus.PP_CLOSING; } - this.host.logWithHostAndPartition(Level.INFO, this.partitionContext, "pump shutdown for reason " + reason.toString()); + this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "pump shutdown for reason " + reason.toString()); specializedShutdown(reason); @@ -157,7 +157,7 @@ protected void onEvents(Iterable events) } if (last != null) { - this.host.logWithHostAndPartition(Level.FINE, this.partitionContext, "Updating offset in partition context with end of batch " + + this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Updating offset in partition context with end of batch " + last.getSystemProperties().getOffset() + "//" + last.getSystemProperties().getSequenceNumber()); this.partitionContext.setOffsetAndSequenceNumber(last); } diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/Pump.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/Pump.java index 7697f61d9..cd054c783 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/Pump.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/Pump.java @@ -44,7 +44,7 @@ public void addPump(String partitionId, Lease lease) throws Exception else { // Pump is working, just replace the lease. - this.host.logWithHostAndPartition(Level.FINE, partitionId, "updating lease for pump"); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "updating lease for pump"); capturedPump.setLease(lease); } } @@ -60,7 +60,7 @@ private void createNewPump(String partitionId, Lease lease) throws Exception PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, this, lease); EventProcessorHost.getExecutorService().submit(() -> newPartitionPump.startPump()); this.pumpStates.put(partitionId, newPartitionPump); // do the put after start, if the start fails then put doesn't happen - this.host.logWithHostAndPartition(Level.INFO, partitionId, "created new pump"); + this.host.logWithHostAndPartition(Level.FINE, partitionId, "created new pump"); } public Future removePump(String partitionId, final CloseReason reason) @@ -69,17 +69,17 @@ public Future removePump(String partitionId, final CloseReason reason) PartitionPump capturedPump = this.pumpStates.get(partitionId); if (capturedPump != null) { - this.host.logWithHostAndPartition(Level.INFO, partitionId, "closing pump for reason " + reason.toString()); + this.host.logWithHostAndPartition(Level.FINE, partitionId, "closing pump for reason " + reason.toString()); retval = EventProcessorHost.getExecutorService().submit(() -> capturedPump.shutdown(reason)); - this.host.logWithHostAndPartition(Level.INFO, partitionId, "removing pump"); + this.host.logWithHostAndPartition(Level.FINE, partitionId, "removing pump"); this.pumpStates.remove(partitionId); } else { // PartitionManager main loop tries to remove pump for every partition that the host does not own, just to be sure. // Not finding a pump for a partition is normal and expected most of the time. - this.host.logWithHostAndPartition(Level.FINE, partitionId, "no pump found to remove for partition " + partitionId); + this.host.logWithHostAndPartition(Level.FINER, partitionId, "no pump found to remove for partition " + partitionId); } return retval; }