Skip to content

Commit

Permalink
Allow publishing cloud events directly
Browse files Browse the repository at this point in the history
Signed-off-by: Sun Zhongfeng <suraciii@outlook.com>
  • Loading branch information
suraciii committed Apr 25, 2022
1 parent 452ccad commit 7125a03
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 7 deletions.
68 changes: 68 additions & 0 deletions src/Dapr.Client/CloudEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Text.Json.Serialization;
using Dapr.Client;

namespace Dapr
{
/// <summary>
/// Represents a CloudEvent without data.
/// </summary>
public class CloudEvent
{
/// <summary>
/// CloudEvent 'source' attribute.
/// </summary>
[JsonPropertyName("source")]
public Uri Source { get; init; }

/// <summary>
/// CloudEvent 'type' attribute.
/// </summary>
[JsonPropertyName("type")]
public string Type { get; init; }

/// <summary>
/// CloudEvent 'subject' attribute.
/// </summary>
[JsonPropertyName("subject")]
public string Subject { get; init; }
}

/// <summary>
/// Represents a CloudEvent with typed data.
/// </summary>
public class CloudEvent<TData> : CloudEvent
{
/// <summary>
/// Initialize a new instance of the <see cref="CloudEvent{TData}"/> class.
/// </summary>
public CloudEvent(TData data)
{
Data = data;
}

/// <summary>
/// CloudEvent 'data' content.
/// </summary>
public TData Data { get; }

/// <summary>
/// Gets event data.
/// </summary>
[JsonPropertyName("datacontenttype")]
public string DataContentType { get; } = Constants.ContentTypeApplicationJson;
}
}
1 change: 1 addition & 0 deletions src/Dapr.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ internal class Constants
{
public const string ContentTypeApplicationJson = MediaTypeNames.Application.Json;
public const string ContentTypeApplicationGrpc = "application/grpc";
public const string ContentTypeCloudEvent = "application/cloudevents+json";
}
}
14 changes: 7 additions & 7 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public override Task PublishEventAsync<TData>(
ArgumentVerifier.ThrowIfNull(data, nameof(data));

var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
return MakePublishRequest(pubsubName, topicName, content, null, cancellationToken);
return MakePublishRequest(pubsubName, topicName, content, null, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken);
}

public override Task PublishEventAsync<TData>(
Expand All @@ -94,7 +94,7 @@ public override Task PublishEventAsync<TData>(
ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));

var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
return MakePublishRequest(pubsubName, topicName, content, metadata, cancellationToken);
return MakePublishRequest(pubsubName, topicName, content, metadata, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken);
}

/// <inheritdoc/>
Expand All @@ -105,7 +105,7 @@ public override Task PublishEventAsync(
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
return MakePublishRequest(pubsubName, topicName, null, null, cancellationToken);
return MakePublishRequest(pubsubName, topicName, null, null, null, cancellationToken);
}

public override Task PublishEventAsync(
Expand All @@ -117,17 +117,17 @@ public override Task PublishEventAsync(
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));
return MakePublishRequest(pubsubName, topicName, null, metadata, cancellationToken);
return MakePublishRequest(pubsubName, topicName, null, metadata, null, cancellationToken);
}

private async Task MakePublishRequest(
string pubsubName,
string topicName,
ByteString content,
Dictionary<string, string> metadata,
string dataContentType,
CancellationToken cancellationToken)
{
// Create PublishEventEnvelope
var envelope = new Autogenerated.PublishEventRequest()
{
PubsubName = pubsubName,
Expand All @@ -137,8 +137,8 @@ private async Task MakePublishRequest(
if (content != null)
{
envelope.Data = content;
envelope.DataContentType = Constants.ContentTypeApplicationJson;
}
envelope.DataContentType = dataContentType ?? Constants.ContentTypeApplicationJson;
}

if (metadata != null)
{
Expand Down
58 changes: 58 additions & 0 deletions test/Dapr.Client.Test/PublishEventApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,64 @@ public async Task PublishEventAsync_CanPublishTopicWithNoContent_WithMetadata()
envelope.Metadata["key2"].Should().Be("value2");
}

[Fact]
public async Task PublishEventAsync_CanPublishCloudEventWithData()
{
await using var client = TestClient.CreateForDaprClient();

var publishData = new PublishData() { PublishObjectParameter = "testparam" };
var cloudEvent = new CloudEvent<PublishData>(publishData)
{
Source = new Uri("urn:testsource"),
Type = "testtype",
Subject = "testsubject",
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.PublishEventAsync<CloudEvent<PublishData>>(TestPubsubName, "test", cloudEvent);
});

request.Dismiss();

var envelope = await request.GetRequestEnvelopeAsync<PublishEventRequest>();
var jsonFromRequest = envelope.Data.ToStringUtf8();

envelope.DataContentType.Should().Be("application/cloudevents+json");
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(cloudEvent, client.InnerClient.JsonSerializerOptions));
envelope.Metadata.Count.Should().Be(0);
}

[Fact]
public async Task PublishEventAsync_CanPublishCloudEventWithNoContent()
{
await using var client = TestClient.CreateForDaprClient();

var publishData = new PublishData() { PublishObjectParameter = "testparam" };
var cloudEvent = new CloudEvent
{
Source = new Uri("urn:testsource"),
Type = "testtype",
Subject = "testsubject",
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.PublishEventAsync<CloudEvent>(TestPubsubName, "test", cloudEvent);
});

request.Dismiss();

var envelope = await request.GetRequestEnvelopeAsync<PublishEventRequest>();
var jsonFromRequest = envelope.Data.ToStringUtf8();

envelope.DataContentType.Should().Be("application/cloudevents+json");
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(cloudEvent, client.InnerClient.JsonSerializerOptions));
envelope.Metadata.Count.Should().Be(0);
}

[Fact]
public async Task PublishEventAsync_WithCancelledToken()
{
Expand Down

0 comments on commit 7125a03

Please sign in to comment.