Skip to content

Commit

Permalink
fix(client): Correct verbosity of correlation warnings and fix the ca…
Browse files Browse the repository at this point in the history
…llback context map (#1357)

* Fixes for issue #1349 

* Correcting ConcurrentHashmap and messageId
* Correcting adding isEmpty to addToWaitingQueue and will not add a callback if the correlationid is empty or if the callback itself is null
* This version of code doesn't generate nulls so we've add isEmpty checks everywhere.
  • Loading branch information
jamdavi authored Sep 15, 2021
1 parent c34ec39 commit 020ac96
Showing 1 changed file with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ public void onMessageSent(Message message, String deviceId, Throwable e)

try
{
String messageId = message.getCorrelationId();
if (messageId != null && correlationCallbacks.containsKey(messageId))
String correlationId = message.getCorrelationId();
if (!correlationId.isEmpty())
{
Object context = correlationCallbackContexts.get(messageId);
correlationCallbacks.get(messageId).onRequestAcknowledged(packet, context, e);
Object context = correlationCallbackContexts.get(correlationId);
correlationCallbacks.get(correlationId).onRequestAcknowledged(packet, context, e);
}
}
catch (Exception ex)
Expand All @@ -245,11 +245,11 @@ public void onMessageSent(Message message, String deviceId, Throwable e)
{
try
{
String messageId = message.getCorrelationId();
if (messageId != null && correlationCallbacks.containsKey(messageId))
String correlationId = message.getCorrelationId();
if (!correlationId.isEmpty())
{
Object context = correlationCallbackContexts.get(messageId);
correlationCallbacks.get(messageId).onUnknownMessageAcknowledged(message, context, e);
Object context = correlationCallbackContexts.get(correlationId);
correlationCallbacks.get(correlationId).onUnknownMessageAcknowledged(message, context, e);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -281,15 +281,11 @@ else if (message != null)
{
if (message != null)
{
String messageId = message.getCorrelationId();
if (messageId != null && correlationCallbacks.containsKey(messageId))
{
Object context = correlationCallbackContexts.get(messageId);
correlationCallbacks.get(messageId).onResponseReceived(message, context, e);
}
else
String correlationId = message.getCorrelationId();
if (!correlationId.isEmpty())
{
log.warn("A message was received with a null correlation id.");
Object context = correlationCallbackContexts.get(correlationId);
correlationCallbacks.get(correlationId).onResponseReceived(message, context, e);
}
}
}
Expand Down Expand Up @@ -612,12 +608,12 @@ public void sendMessages()

try
{
String messageId = message.getCorrelationId();
String correlationId = message.getCorrelationId();

if (messageId != null && correlationCallbacks.containsKey(messageId))
if (!correlationId.isEmpty())
{
Object context = correlationCallbackContexts.get(messageId);
correlationCallbacks.get(messageId).onRequestSent(message, packet, context);
Object context = correlationCallbackContexts.get(correlationId);
correlationCallbacks.get(correlationId).onRequestSent(message, packet, context);
}
}
catch (Exception e)
Expand Down Expand Up @@ -1723,15 +1719,23 @@ private void addToWaitingQueue(IotHubTransportPacket packet)
{
try
{
if (packet != null && packet.getMessage() != null && packet.getMessage().getCorrelatingMessageCallback() != null)
if (packet != null)
{
Message message = packet.getMessage();
String messageId = message.getCorrelationId();
if (!correlationCallbacks.containsKey(messageId))
if (message != null)
{
correlationCallbacks.put(messageId, message.getCorrelatingMessageCallback());
correlationCallbackContexts.put(messageId, message.getCorrelatingMessageCallbackContext());
correlationCallbacks.get(messageId).onRequestQueued(message, packet, correlationCallbackContexts.get(messageId));
String correlationId = message.getCorrelationId();
CorrelatingMessageCallback correlationCallback = message.getCorrelatingMessageCallback();
if (!correlationId.isEmpty() && correlationCallback != null)
{
correlationCallbacks.put(correlationId, correlationCallback);
Object correlationCallbackContext = message.getCorrelatingMessageCallbackContext();
if (correlationCallbackContext != null)
{
correlationCallbackContexts.put(correlationId, correlationCallbackContext);
}
correlationCallback.onRequestQueued(message, packet, correlationCallbackContext);
}
}
}
}
Expand Down

0 comments on commit 020ac96

Please sign in to comment.