From 7125a0322c81bc0f972ac3995d4fc56cb85eefa2 Mon Sep 17 00:00:00 2001 From: Sun Zhongfeng Date: Fri, 22 Apr 2022 10:15:17 +0800 Subject: [PATCH] Allow publishing cloud events directly Signed-off-by: Sun Zhongfeng --- src/Dapr.Client/CloudEvent.cs | 68 ++++++++++++++++++++ src/Dapr.Client/Constants.cs | 1 + src/Dapr.Client/DaprClientGrpc.cs | 14 ++-- test/Dapr.Client.Test/PublishEventApiTest.cs | 58 +++++++++++++++++ 4 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 src/Dapr.Client/CloudEvent.cs diff --git a/src/Dapr.Client/CloudEvent.cs b/src/Dapr.Client/CloudEvent.cs new file mode 100644 index 000000000..bb0eaf464 --- /dev/null +++ b/src/Dapr.Client/CloudEvent.cs @@ -0,0 +1,68 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Text.Json.Serialization; +using Dapr.Client; + +namespace Dapr +{ + /// + /// Represents a CloudEvent without data. + /// + public class CloudEvent + { + /// + /// CloudEvent 'source' attribute. + /// + [JsonPropertyName("source")] + public Uri Source { get; init; } + + /// + /// CloudEvent 'type' attribute. + /// + [JsonPropertyName("type")] + public string Type { get; init; } + + /// + /// CloudEvent 'subject' attribute. + /// + [JsonPropertyName("subject")] + public string Subject { get; init; } + } + + /// + /// Represents a CloudEvent with typed data. + /// + public class CloudEvent : CloudEvent + { + /// + /// Initialize a new instance of the class. + /// + public CloudEvent(TData data) + { + Data = data; + } + + /// + /// CloudEvent 'data' content. + /// + public TData Data { get; } + + /// + /// Gets event data. + /// + [JsonPropertyName("datacontenttype")] + public string DataContentType { get; } = Constants.ContentTypeApplicationJson; + } +} diff --git a/src/Dapr.Client/Constants.cs b/src/Dapr.Client/Constants.cs index a7a348027..37ebbb21b 100644 --- a/src/Dapr.Client/Constants.cs +++ b/src/Dapr.Client/Constants.cs @@ -19,5 +19,6 @@ internal class Constants { public const string ContentTypeApplicationJson = MediaTypeNames.Application.Json; public const string ContentTypeApplicationGrpc = "application/grpc"; + public const string ContentTypeCloudEvent = "application/cloudevents+json"; } } diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index 0b463602b..2d1e5e115 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -78,7 +78,7 @@ public override Task PublishEventAsync( ArgumentVerifier.ThrowIfNull(data, nameof(data)); var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions); - return MakePublishRequest(pubsubName, topicName, content, null, cancellationToken); + return MakePublishRequest(pubsubName, topicName, content, null, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); } public override Task PublishEventAsync( @@ -94,7 +94,7 @@ public override Task PublishEventAsync( ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata)); var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions); - return MakePublishRequest(pubsubName, topicName, content, metadata, cancellationToken); + return MakePublishRequest(pubsubName, topicName, content, metadata, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); } /// @@ -105,7 +105,7 @@ public override Task PublishEventAsync( { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); - return MakePublishRequest(pubsubName, topicName, null, null, cancellationToken); + return MakePublishRequest(pubsubName, topicName, null, null, null, cancellationToken); } public override Task PublishEventAsync( @@ -117,7 +117,7 @@ public override Task PublishEventAsync( ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata)); - return MakePublishRequest(pubsubName, topicName, null, metadata, cancellationToken); + return MakePublishRequest(pubsubName, topicName, null, metadata, null, cancellationToken); } private async Task MakePublishRequest( @@ -125,9 +125,9 @@ private async Task MakePublishRequest( string topicName, ByteString content, Dictionary metadata, + string dataContentType, CancellationToken cancellationToken) { - // Create PublishEventEnvelope var envelope = new Autogenerated.PublishEventRequest() { PubsubName = pubsubName, @@ -137,8 +137,8 @@ private async Task MakePublishRequest( if (content != null) { envelope.Data = content; - envelope.DataContentType = Constants.ContentTypeApplicationJson; - } + envelope.DataContentType = dataContentType ?? Constants.ContentTypeApplicationJson; + } if (metadata != null) { diff --git a/test/Dapr.Client.Test/PublishEventApiTest.cs b/test/Dapr.Client.Test/PublishEventApiTest.cs index 59350c00f..b5a3be815 100644 --- a/test/Dapr.Client.Test/PublishEventApiTest.cs +++ b/test/Dapr.Client.Test/PublishEventApiTest.cs @@ -137,6 +137,64 @@ public async Task PublishEventAsync_CanPublishTopicWithNoContent_WithMetadata() envelope.Metadata["key2"].Should().Be("value2"); } + [Fact] + public async Task PublishEventAsync_CanPublishCloudEventWithData() + { + await using var client = TestClient.CreateForDaprClient(); + + var publishData = new PublishData() { PublishObjectParameter = "testparam" }; + var cloudEvent = new CloudEvent(publishData) + { + Source = new Uri("urn:testsource"), + Type = "testtype", + Subject = "testsubject", + }; + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.PublishEventAsync>(TestPubsubName, "test", cloudEvent); + }); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + var jsonFromRequest = envelope.Data.ToStringUtf8(); + + envelope.DataContentType.Should().Be("application/cloudevents+json"); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be("test"); + jsonFromRequest.Should().Be(JsonSerializer.Serialize(cloudEvent, client.InnerClient.JsonSerializerOptions)); + envelope.Metadata.Count.Should().Be(0); + } + + [Fact] + public async Task PublishEventAsync_CanPublishCloudEventWithNoContent() + { + await using var client = TestClient.CreateForDaprClient(); + + var publishData = new PublishData() { PublishObjectParameter = "testparam" }; + var cloudEvent = new CloudEvent + { + Source = new Uri("urn:testsource"), + Type = "testtype", + Subject = "testsubject", + }; + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.PublishEventAsync(TestPubsubName, "test", cloudEvent); + }); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + var jsonFromRequest = envelope.Data.ToStringUtf8(); + + envelope.DataContentType.Should().Be("application/cloudevents+json"); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be("test"); + jsonFromRequest.Should().Be(JsonSerializer.Serialize(cloudEvent, client.InnerClient.JsonSerializerOptions)); + envelope.Metadata.Count.Should().Be(0); + } + [Fact] public async Task PublishEventAsync_WithCancelledToken() {