Skip to content

Commit

Permalink
Twin DP sub fix (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
varunpuranik authored and myagley committed Sep 19, 2018
1 parent 2937124 commit 8b1fb67
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class TwinReceivingLinkHandler : ReceivingLinkHandler
{
public const string TwinPatch = "PATCH";
public const string TwinGet = "GET";
public const string TwinPut = "PUT";
public const string TwinDelete = "DELETE";

public TwinReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDictionary<string, string> boundVariables,
IMessageConverter<AmqpMessage> messageConverter)
Expand Down Expand Up @@ -51,14 +53,39 @@ protected override async Task OnMessageReceived(AmqpMessage amqpMessage)
Events.InvalidCorrelationId(this);
return;
}

await this.DeviceListener.SendGetTwinRequest(correlationId);
Events.ProcessedTwinGetRequest(this);
break;

case TwinPatch:
EdgeMessage reportedPropertiesMessage = new EdgeMessage.Builder(amqpMessage.GetPayloadBytes()).Build();
await this.DeviceListener.UpdateReportedPropertiesAsync(reportedPropertiesMessage, correlationId);
Events.ProcessedTwinReportedPropertiesUpdate(this);
break;

case TwinPut:
if (string.IsNullOrWhiteSpace(correlationId))
{
Events.InvalidCorrelationId(this);
return;
}

await this.DeviceListener.AddDesiredPropertyUpdatesSubscription(correlationId);
Events.ProcessedDesiredPropertyUpdatesSubscriptionRequest(this, correlationId);
break;

case TwinDelete:
if (string.IsNullOrWhiteSpace(correlationId))
{
Events.InvalidCorrelationId(this);
return;
}

await this.DeviceListener.RemoveDesiredPropertyUpdatesSubscription(correlationId);
Events.ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest(this, correlationId);
break;

default:
Events.InvalidOperation(this, operation);
break;
Expand All @@ -74,7 +101,9 @@ enum EventIds
{
InvalidOperation = IdStart,
ProcessedTwinGetRequest,
ProcessedTwinReportedPropertiesUpdate
ProcessedTwinReportedPropertiesUpdate,
ProcessedDesiredPropertyUpdatesSubscriptionRequest,
ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest
}

public static void InvalidOperation(TwinReceivingLinkHandler handler)
Expand All @@ -101,6 +130,16 @@ public static void InvalidCorrelationId(TwinReceivingLinkHandler handler)
{
Log.LogWarning((int)EventIds.InvalidOperation, $"Cannot process message on link {handler.LinkUri} because no correlation ID was specified");
}

public static void ProcessedDesiredPropertyUpdatesSubscriptionRequest(TwinReceivingLinkHandler handler, string correlationId)
{
Log.LogDebug((int)EventIds.ProcessedDesiredPropertyUpdatesSubscriptionRequest, $"Processed Twin desired properties subscription for {handler.ClientId} on request {correlationId}");
}

public static void ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest(TwinReceivingLinkHandler handler, string correlationId)
{
Log.LogDebug((int)EventIds.ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest, $"Processed removing Twin desired properties subscription for {handler.ClientId} on request {correlationId}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ public async Task ProcessMessageFeedbackAsync(string messageId, FeedbackStatus f

public Task RemoveSubscription(DeviceSubscription subscription) => this.edgeHub.RemoveSubscription(this.Identity.Id, subscription);

public async Task AddDesiredPropertyUpdatesSubscription(string correlationId)
{
await this.edgeHub.AddSubscription(this.Identity.Id, DeviceSubscription.DesiredPropertyUpdates);
if (!string.IsNullOrWhiteSpace(correlationId))
{
IMessage responseMessage = new EdgeMessage.Builder(new byte[0])
.SetSystemProperties(new Dictionary<string, string>
{
[SystemProperties.CorrelationId] = correlationId,
[SystemProperties.StatusCode] = ((int)HttpStatusCode.OK).ToString()
})
.Build();
await this.SendTwinUpdate(responseMessage);
}
}

public async Task RemoveDesiredPropertyUpdatesSubscription(string correlationId)
{
await this.edgeHub.RemoveSubscription(this.Identity.Id, DeviceSubscription.DesiredPropertyUpdates);
if (!string.IsNullOrWhiteSpace(correlationId))
{
IMessage responseMessage = new EdgeMessage.Builder(new byte[0])
.SetSystemProperties(new Dictionary<string, string>
{
[SystemProperties.CorrelationId] = correlationId,
[SystemProperties.StatusCode] = ((int)HttpStatusCode.OK).ToString()
})
.Build();
await this.SendTwinUpdate(responseMessage);
}
}

public async Task SendGetTwinRequest(string correlationId)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ public interface IDeviceListener
Task AddSubscription(DeviceSubscription subscription);

Task RemoveSubscription(DeviceSubscription subscription);

Task AddDesiredPropertyUpdatesSubscription(string correlationId);

Task RemoveDesiredPropertyUpdatesSubscription(string correlationId);
}
}

0 comments on commit 8b1fb67

Please sign in to comment.