diff --git a/cli/service/resources/default-reaction-providers.yaml b/cli/service/resources/default-reaction-providers.yaml index bab341b8..3d739418 100644 --- a/cli/service/resources/default-reaction-providers.yaml +++ b/cli/service/resources/default-reaction-providers.yaml @@ -144,18 +144,25 @@ kind: ReactionProvider name: StorageQueue spec: services: - storage_queue: + reaction: image: reaction-storage-queue config_schema: type: object properties: - StorageConnectionString: + connectionString: type: string - QueueName: + endpoint: + type: string + queueName: + type: string + format: type: string + enum: + - "packed" + - "unpacked" + default: "packed" required: - - StorageConnectionString - - QueueName + - queueName --- apiVersion: v1 kind: ReactionProvider diff --git a/control-planes/mgmt_api/src/api/v1/models/providers.rs b/control-planes/mgmt_api/src/api/v1/models/providers.rs index ee6a6eda..24557d07 100644 --- a/control-planes/mgmt_api/src/api/v1/models/providers.rs +++ b/control-planes/mgmt_api/src/api/v1/models/providers.rs @@ -82,6 +82,7 @@ pub struct JsonSchemaDto { pub items: Option>, // For array types #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "enum")] pub enum_values: Option>, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/control-planes/mgmt_api/src/domain/models.rs b/control-planes/mgmt_api/src/domain/models.rs index 93eda097..1a861789 100644 --- a/control-planes/mgmt_api/src/domain/models.rs +++ b/control-planes/mgmt_api/src/domain/models.rs @@ -339,6 +339,7 @@ pub struct JsonSchema { pub items: Option>, // For array types #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "enum")] pub enum_values: Option>, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/reactions/azure/storagequeue-reaction/Dockerfile b/reactions/azure/storagequeue-reaction/Dockerfile index f44a4cff..d7f567f7 100644 --- a/reactions/azure/storagequeue-reaction/Dockerfile +++ b/reactions/azure/storagequeue-reaction/Dockerfile @@ -1,37 +1,11 @@ -# Copyright 2024 The Drasi Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. - -FROM --platform=$TARGETPLATFORM mcr.microsoft.com/dotnet/aspnet@sha256:22414f335b79654fb42257326e7f17f18edf8f912578fc33a55c5dd609bd022e AS base -# mcr.microsoft.com/dotnet/aspnet:6.0 -WORKDIR /app -EXPOSE 80 - -FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/sdk@sha256:6df1177e48b55272316d08f19cb383483af82aca5cdc67a76c414bc200847624 AS build -# mcr.microsoft.com/dotnet/sdk:6.0 +FROM mcr.microsoft.com/dotnet/sdk:8.0-cbl-mariner2.0 AS build +ARG BUILD_CONFIGURATION=Release WORKDIR /src -COPY ["storagequeue-reaction.csproj", "."] -RUN dotnet restore "./storagequeue-reaction.csproj" COPY . . -WORKDIR "/src/." -RUN dotnet build "storagequeue-reaction.csproj" -c Release -o /app/build - -FROM build AS publish -RUN dotnet publish "storagequeue-reaction.csproj" -c Release -o /app/publish /p:UseAppHost=false +RUN dotnet restore +RUN dotnet build -c $BUILD_CONFIGURATION -o /app/build -FROM base AS final +FROM mcr.microsoft.com/dotnet/aspnet:8.0-cbl-mariner2.0 AS final WORKDIR /app -COPY --from=publish /app/publish . -ENTRYPOINT ["dotnet", "storagequeue-reaction.dll"] \ No newline at end of file +COPY --from=build /app/build . +ENTRYPOINT ["/app/storagequeue-reaction"] \ No newline at end of file diff --git a/reactions/azure/storagequeue-reaction/Makefile b/reactions/azure/storagequeue-reaction/Makefile index 64df7b25..eef4ef09 100644 --- a/reactions/azure/storagequeue-reaction/Makefile +++ b/reactions/azure/storagequeue-reaction/Makefile @@ -1,4 +1,4 @@ -.PHONY: default docker-build kind-load +.PHONY: default docker-build kind-load generate-types CLUSTER_NAME ?= kind IMAGE_PREFIX ?= drasi-project @@ -8,13 +8,18 @@ DOCKERX_OPTS ?= --load --cache-to type=inline,mode=max default: docker-build docker-build: - docker buildx build . -t $(IMAGE_PREFIX)/reaction-storagequeue:$(DOCKER_TAG_VERSION) $(DOCKERX_OPTS) + docker buildx build . -t $(IMAGE_PREFIX)/reaction-storage-queue:$(DOCKER_TAG_VERSION) $(DOCKERX_OPTS) kind-load: - kind load docker-image $(IMAGE_PREFIX)/reaction-storagequeue:$(DOCKER_TAG_VERSION) --name $(CLUSTER_NAME) + kind load docker-image $(IMAGE_PREFIX)/reaction-storage-queue:$(DOCKER_TAG_VERSION) --name $(CLUSTER_NAME) test: @echo "No tests to run yet" lint-check: - @echo "No lint checks to run yet" \ No newline at end of file + @echo "No lint checks to run yet" + +generate-types: + npm run clean --prefix ../../../typespec + npm run build ./output-unpacked --prefix ../../../typespec + quicktype --src-lang schema -l cs -o Models/Unpacked.generated.cs ../../../typespec/output-unpacked/_generated/@typespec/json-schema/*.yaml --framework SystemTextJson --namespace Drasi.Reactions.StorageQueue.Models.Unpacked \ No newline at end of file diff --git a/reactions/azure/storagequeue-reaction/Models/ChangeNotification.cs b/reactions/azure/storagequeue-reaction/Models/ChangeNotification.cs deleted file mode 100644 index 8d05a7da..00000000 --- a/reactions/azure/storagequeue-reaction/Models/ChangeNotification.cs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2024 The Drasi Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using System.Text.Json.Serialization; - -namespace StorageQueueReaction.Models -{ - public class ChangeNotification - { - [JsonPropertyName("op")] - public string Op { get; set; } - - [JsonPropertyName("ts_ms")] - public long TimestampMilliseconds { get; set; } - - [JsonPropertyName("schema")] - public string Schema { get; set; } - - [JsonPropertyName("payload")] - public ChangePayload Payload { get; set; } - } - - public class ChangePayload - { - [JsonPropertyName("source")] - public ChangeSource Source { get; set; } - - [JsonPropertyName("before")] - public object? Before { get; set; } - - [JsonPropertyName("after")] - public object? After { get; set; } - } - - public class ChangeSource - { - [JsonPropertyName("db")] - public string Db { get; set; } - [JsonPropertyName("table")] - public string Table { get; set; } - } -} diff --git a/reactions/azure/storagequeue-reaction/Models/Unpacked.cs b/reactions/azure/storagequeue-reaction/Models/Unpacked.cs new file mode 100644 index 00000000..5ef775dd --- /dev/null +++ b/reactions/azure/storagequeue-reaction/Models/Unpacked.cs @@ -0,0 +1,8 @@ +using System.Text.Json; + +namespace Drasi.Reactions.StorageQueue.Models.Unpacked; + +public static class ModelOptions +{ + public static JsonSerializerOptions JsonOptions => Converter.Settings; +} \ No newline at end of file diff --git a/reactions/azure/storagequeue-reaction/Models/Unpacked.generated.cs b/reactions/azure/storagequeue-reaction/Models/Unpacked.generated.cs new file mode 100644 index 00000000..2cfdb293 --- /dev/null +++ b/reactions/azure/storagequeue-reaction/Models/Unpacked.generated.cs @@ -0,0 +1,470 @@ +// +// +// To parse this JSON data, add NuGet 'System.Text.Json' then do one of these: +// +// using Drasi.Reactions.StorageQueue.Models.Unpacked; +// +// var changeNotification = ChangeNotification.FromJson(jsonString); +// var changePayload = ChangePayload.FromJson(jsonString); +// var changeSource = ChangeSource.FromJson(jsonString); +// var controlPayload = ControlPayload.FromJson(jsonString); +// var controlSignalNotification = ControlSignalNotification.FromJson(jsonString); +// var notification = Notification.FromJson(jsonString); +// var op = Op.FromJson(jsonString); +// var versions = Versions.FromJson(jsonString); +#nullable enable +#pragma warning disable CS8618 +#pragma warning disable CS8601 +#pragma warning disable CS8603 + +namespace Drasi.Reactions.StorageQueue.Models.Unpacked +{ + using System; + using System.Collections.Generic; + + using System.Text.Json; + using System.Text.Json.Serialization; + using System.Globalization; + + public partial class ChangeNotification + { + [JsonPropertyName("op")] + public ChangeNotificationOp Op { get; set; } + + [JsonPropertyName("payload")] + public PayloadClass Payload { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class PayloadClass + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("after")] + public Dictionary After { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("before")] + public Dictionary Before { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class SourceClass + { + /// + /// The ID of the query that the change originated from + /// + [JsonPropertyName("queryId")] + public string QueryId { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ChangePayload + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("after")] + public Dictionary After { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("before")] + public Dictionary Before { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class ChangeSource + { + /// + /// The ID of the query that the change originated from + /// + [JsonPropertyName("queryId")] + public string QueryId { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ControlPayload + { + [JsonPropertyName("kind")] + public string Kind { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class ControlSignalNotification + { + [JsonPropertyName("op")] + public ControlSignalNotificationOp Op { get; set; } + + [JsonPropertyName("payload")] + public ControlSignalNotificationPayload Payload { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ControlSignalNotificationPayload + { + [JsonPropertyName("kind")] + public string Kind { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class Notification + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("op")] + public OpEnum Op { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public enum ChangeNotificationOp { D, I, U }; + + public enum ControlSignalNotificationOp { X }; + + public enum OpEnum { D, I, U, X }; + + public enum VersionsEnum { V1 }; + + public partial class ChangeNotification + { + public static ChangeNotification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public partial class ChangePayload + { + public static ChangePayload FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public partial class ChangeSource + { + public static ChangeSource FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public partial class ControlPayload + { + public static ControlPayload FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public partial class ControlSignalNotification + { + public static ControlSignalNotification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public partial class Notification + { + public static Notification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public class Op + { + public static OpEnum FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public class Versions + { + public static VersionsEnum FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + public static class Serialize + { + public static string ToJson(this ChangeNotification self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this ChangePayload self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this ChangeSource self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this ControlPayload self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this ControlSignalNotification self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this Notification self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this OpEnum self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + public static string ToJson(this VersionsEnum self) => JsonSerializer.Serialize(self, Drasi.Reactions.StorageQueue.Models.Unpacked.Converter.Settings); + } + + internal static class Converter + { + public static readonly JsonSerializerOptions Settings = new(JsonSerializerDefaults.General) + { + Converters = + { + ChangeNotificationOpConverter.Singleton, + ControlSignalNotificationOpConverter.Singleton, + OpEnumConverter.Singleton, + VersionsEnumConverter.Singleton, + new DateOnlyConverter(), + new TimeOnlyConverter(), + IsoDateTimeOffsetConverter.Singleton + }, + }; + } + + internal class ChangeNotificationOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ChangeNotificationOp); + + public override ChangeNotificationOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + switch (value) + { + case "d": + return ChangeNotificationOp.D; + case "i": + return ChangeNotificationOp.I; + case "u": + return ChangeNotificationOp.U; + } + throw new Exception("Cannot unmarshal type ChangeNotificationOp"); + } + + public override void Write(Utf8JsonWriter writer, ChangeNotificationOp value, JsonSerializerOptions options) + { + switch (value) + { + case ChangeNotificationOp.D: + JsonSerializer.Serialize(writer, "d", options); + return; + case ChangeNotificationOp.I: + JsonSerializer.Serialize(writer, "i", options); + return; + case ChangeNotificationOp.U: + JsonSerializer.Serialize(writer, "u", options); + return; + } + throw new Exception("Cannot marshal type ChangeNotificationOp"); + } + + public static readonly ChangeNotificationOpConverter Singleton = new ChangeNotificationOpConverter(); + } + + internal class ControlSignalNotificationOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ControlSignalNotificationOp); + + public override ControlSignalNotificationOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "x") + { + return ControlSignalNotificationOp.X; + } + throw new Exception("Cannot unmarshal type ControlSignalNotificationOp"); + } + + public override void Write(Utf8JsonWriter writer, ControlSignalNotificationOp value, JsonSerializerOptions options) + { + if (value == ControlSignalNotificationOp.X) + { + JsonSerializer.Serialize(writer, "x", options); + return; + } + throw new Exception("Cannot marshal type ControlSignalNotificationOp"); + } + + public static readonly ControlSignalNotificationOpConverter Singleton = new ControlSignalNotificationOpConverter(); + } + + internal class OpEnumConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(OpEnum); + + public override OpEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + switch (value) + { + case "d": + return OpEnum.D; + case "i": + return OpEnum.I; + case "u": + return OpEnum.U; + case "x": + return OpEnum.X; + } + throw new Exception("Cannot unmarshal type OpEnum"); + } + + public override void Write(Utf8JsonWriter writer, OpEnum value, JsonSerializerOptions options) + { + switch (value) + { + case OpEnum.D: + JsonSerializer.Serialize(writer, "d", options); + return; + case OpEnum.I: + JsonSerializer.Serialize(writer, "i", options); + return; + case OpEnum.U: + JsonSerializer.Serialize(writer, "u", options); + return; + case OpEnum.X: + JsonSerializer.Serialize(writer, "x", options); + return; + } + throw new Exception("Cannot marshal type OpEnum"); + } + + public static readonly OpEnumConverter Singleton = new OpEnumConverter(); + } + + internal class VersionsEnumConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(VersionsEnum); + + public override VersionsEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "v1") + { + return VersionsEnum.V1; + } + throw new Exception("Cannot unmarshal type VersionsEnum"); + } + + public override void Write(Utf8JsonWriter writer, VersionsEnum value, JsonSerializerOptions options) + { + if (value == VersionsEnum.V1) + { + JsonSerializer.Serialize(writer, "v1", options); + return; + } + throw new Exception("Cannot marshal type VersionsEnum"); + } + + public static readonly VersionsEnumConverter Singleton = new VersionsEnumConverter(); + } + + public class DateOnlyConverter : JsonConverter + { + private readonly string serializationFormat; + public DateOnlyConverter() : this(null) { } + + public DateOnlyConverter(string? serializationFormat) + { + this.serializationFormat = serializationFormat ?? "yyyy-MM-dd"; + } + + public override DateOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return DateOnly.Parse(value!); + } + + public override void Write(Utf8JsonWriter writer, DateOnly value, JsonSerializerOptions options) + => writer.WriteStringValue(value.ToString(serializationFormat)); + } + + public class TimeOnlyConverter : JsonConverter + { + private readonly string serializationFormat; + + public TimeOnlyConverter() : this(null) { } + + public TimeOnlyConverter(string? serializationFormat) + { + this.serializationFormat = serializationFormat ?? "HH:mm:ss.fff"; + } + + public override TimeOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return TimeOnly.Parse(value!); + } + + public override void Write(Utf8JsonWriter writer, TimeOnly value, JsonSerializerOptions options) + => writer.WriteStringValue(value.ToString(serializationFormat)); + } + + internal class IsoDateTimeOffsetConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(DateTimeOffset); + + private const string DefaultDateTimeFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.FFFFFFFK"; + + private DateTimeStyles _dateTimeStyles = DateTimeStyles.RoundtripKind; + private string? _dateTimeFormat; + private CultureInfo? _culture; + + public DateTimeStyles DateTimeStyles + { + get => _dateTimeStyles; + set => _dateTimeStyles = value; + } + + public string? DateTimeFormat + { + get => _dateTimeFormat ?? string.Empty; + set => _dateTimeFormat = (string.IsNullOrEmpty(value)) ? null : value; + } + + public CultureInfo Culture + { + get => _culture ?? CultureInfo.CurrentCulture; + set => _culture = value; + } + + public override void Write(Utf8JsonWriter writer, DateTimeOffset value, JsonSerializerOptions options) + { + string text; + + + if ((_dateTimeStyles & DateTimeStyles.AdjustToUniversal) == DateTimeStyles.AdjustToUniversal + || (_dateTimeStyles & DateTimeStyles.AssumeUniversal) == DateTimeStyles.AssumeUniversal) + { + value = value.ToUniversalTime(); + } + + text = value.ToString(_dateTimeFormat ?? DefaultDateTimeFormat, Culture); + + writer.WriteStringValue(text); + } + + public override DateTimeOffset Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + string? dateText = reader.GetString(); + + if (string.IsNullOrEmpty(dateText) == false) + { + if (!string.IsNullOrEmpty(_dateTimeFormat)) + { + return DateTimeOffset.ParseExact(dateText, _dateTimeFormat, Culture, _dateTimeStyles); + } + else + { + return DateTimeOffset.Parse(dateText, Culture, _dateTimeStyles); + } + } + else + { + return default(DateTimeOffset); + } + } + + + public static readonly IsoDateTimeOffsetConverter Singleton = new IsoDateTimeOffsetConverter(); + } +} +#pragma warning restore CS8618 +#pragma warning restore CS8601 +#pragma warning restore CS8603 diff --git a/reactions/azure/storagequeue-reaction/Program.cs b/reactions/azure/storagequeue-reaction/Program.cs index 4cca5345..ed563b9a 100644 --- a/reactions/azure/storagequeue-reaction/Program.cs +++ b/reactions/azure/storagequeue-reaction/Program.cs @@ -1,106 +1,64 @@ -// Copyright 2024 The Drasi Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using Dapr.Client; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Mvc; -using Newtonsoft.Json.Linq; -using Newtonsoft.Json; -using Microsoft.Azure.WebJobs.Extensions.Http; -using StorageQueueReaction.Services; -using StorageQueueReaction.Models; -using System.Text.Json; -using System; +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Azure; +using Azure.Identity; +using Azure.Storage; using Azure.Storage.Queues; -using Azure.Storage.Queues.Models; - -var builder = WebApplication.CreateBuilder(args); -var configuration = BuildConfiguration(); - -string connectionString = configuration.GetValue("StorageConnectionString"); -string queueName = configuration.GetValue("QueueName"); -var pubsubName = configuration.GetValue("PubsubName", "drasi-pubsub"); -var configDirectory = configuration.GetValue("QueryConfigPath", "/etc/queries"); - - -builder.Services.AddDaprClient(); -builder.Services.AddControllers(); -builder.Services.AddSingleton(); - - -var queueServiceClient = new QueueServiceClient(connectionString); -var queueClient = queueServiceClient.GetQueueClient(queueName); - -var app = builder.Build(); - -app.UseCors(); -app.UseRouting(); -app.UseCloudEvents(); - -app.UseEndpoints(endpoints => -{ - endpoints.MapSubscribeHandler(); - var ep = endpoints.MapPost("event", ProcessEvent); - - foreach (var qpath in Directory.GetFiles(configDirectory)) +using Drasi.Reaction.SDK; +using Drasi.Reactions.StorageQueue.Services; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +var reaction = new ReactionBuilder() + .UseChangeEventHandler() + .UseControlEventHandler() + .ConfigureServices((services) => { - var queryId = Path.GetFileName(qpath); - ep.WithTopic(pubsubName, queryId + "-results"); - } -}); - -app.Run("http://0.0.0.0:80"); - -static IConfiguration BuildConfiguration() + services.AddSingleton(); + services.AddSingleton(sp => + { + var config = sp.GetRequiredService(); + var connectionString = config.GetValue("connectionString"); + var endpoint = config.GetValue("endpoint"); + var queueName = config.GetValue("queueName"); + + QueueServiceClient queueServiceClient; + if (!String.IsNullOrEmpty(connectionString)) + { + Console.WriteLine("Using connection string"); + queueServiceClient = new QueueServiceClient(connectionString); + } + else + { + Console.WriteLine("Using DefaultAzureCredential authentication"); + if (String.IsNullOrEmpty(endpoint)) + { + throw new Exception("Either connection string or endpoint must be provided"); + } + queueServiceClient = new QueueServiceClient(new Uri(endpoint), new DefaultAzureCredential()); + } + + return queueServiceClient.GetQueueClient(queueName); + }); + }) + .Build(); + +if (!await reaction.Services.GetRequiredService().ExistsAsync()) { - return new ConfigurationBuilder() - .SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) - .AddEnvironmentVariables() - .Build(); + Reaction.TerminateWithError("queue does not exist"); } +await reaction.StartAsync(); - - -async Task ProcessEvent(HttpContext context) -{ - try - { - var changeFormatter = context.RequestServices.GetRequiredService(); - var data = await JsonDocument.ParseAsync(context.Request.Body); - - Console.WriteLine("Got event: " + data.RootElement.GetRawText()); - - var evt = data.RootElement; - - var kind = evt.GetProperty("kind").GetString(); - if (kind == "control") - { - return; - } - - if (evt.GetProperty("addedResults").GetArrayLength() == 0 && evt.GetProperty("updatedResults").GetArrayLength() == 0 && evt.GetProperty("deletedResults").GetArrayLength() == 0) - { - return; - } - await queueClient.SendMessageAsync(data.RootElement.GetRawText()); - context.Response.StatusCode = 200; - } - catch (Exception ex) - { - Console.WriteLine($"Error processing event: {ex.Message}"); - throw; - } -} \ No newline at end of file diff --git a/reactions/azure/storagequeue-reaction/Properties/launchSettings.json b/reactions/azure/storagequeue-reaction/Properties/launchSettings.json deleted file mode 100644 index a7c90faa..00000000 --- a/reactions/azure/storagequeue-reaction/Properties/launchSettings.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:35143", - "sslPort": 44305 - } - }, - "profiles": { - "http": { - "commandName": "Project", - "dotnetRunMessages": true, - "launchBrowser": true, - "applicationUrl": "http://localhost:5114", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, - "https": { - "commandName": "Project", - "dotnetRunMessages": true, - "launchBrowser": true, - "applicationUrl": "https://localhost:7251;http://localhost:5114", - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, - "IIS Express": { - "commandName": "IISExpress", - "launchBrowser": true, - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - } - } -} diff --git a/reactions/azure/storagequeue-reaction/README.md b/reactions/azure/storagequeue-reaction/README.md new file mode 100644 index 00000000..5baf4a46 --- /dev/null +++ b/reactions/azure/storagequeue-reaction/README.md @@ -0,0 +1,115 @@ +# Drasi: Azure Storage Queue Reaction + +The Azure Storage Queue Reaction enqueues messages on [Azure Storage Queues](https://learn.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction) in response to changes to the result set of a Drasi Continuous Query. The output format can either be the packed format of the raw query output or an unpacked format, where a single message represents one change to the result set. + +## Getting started + +The reaction takes the following configuration properties: + +| Property | Description | +|-|-| +| endpoint | Endpoint of the Storage Account queue service, in the form https://{account-name}.queue.core.windows.net, if not using connection string| +| connectionString | Connection String of Azure Storage Account, if using connection string based authentication. | +| queueName | Name of Queue. It should already exist on your storage account. | +| format | The output format for the messages that are enqueued. The can either be `packed` for the raw query output or `unpacked` for a message per result set change. | + +### Example + +```yaml +kind: Reaction +apiVersion: v1 +name: my-reaction +spec: + kind: StorageQueue + properties: + connectionString: + queueName: + format: + queries: + query1: + query2: +``` + +## Output formats + +### Packed Format + +The packed format produces one message per source change that includes all changes to the result set and looks as follows: + +```json +{ + "kind":"change", + "queryId": "query1", + "sequence": 2, + "sourceTimeMs": 0, + "addedResults": [ + { "id": 10, "temperature": 22 } + ], + "updatedResults":[{ + "before": { "id": 11, "temperature": 25 }, + "after": { "id": 11, "temperature": 27 } + }], + "deletedResults":[ + { "id": 12, "temperature": 30 } + ] +} +``` + + +### Unpacked Format + +The Unpacked format flattens all the changed result set items into one message per item and looks as follows: + +```json +{ + "op": "i", + "ts_ms": 0, + "payload": { + "source": { + "queryId": "query1", + "ts_ms": 0 + }, + "after": { + "id": 10, + "temperature": 22 + } + } +} +``` +```json +{ + "op": "u", + "ts_ms": 0, + "payload": { + "source": { + "queryId": "query1", + "ts_ms": 0 + }, + "before": { + "id": 11, + "temperature": 25 + }, + "after": { + "id": 11, + "temperature": 27 + } + } +} +``` +```json +{ + "op": "d", + "ts_ms": 0, + "payload": { + "source": { + "queryId": "query1", + "ts_ms": 0 + }, + "before": { + "id": 12, + "temperature": 30 + } + } +} +``` + diff --git a/reactions/azure/storagequeue-reaction/Services/ChangeFormatter.cs b/reactions/azure/storagequeue-reaction/Services/ChangeFormatter.cs index a1ec1ef5..9b887532 100644 --- a/reactions/azure/storagequeue-reaction/Services/ChangeFormatter.cs +++ b/reactions/azure/storagequeue-reaction/Services/ChangeFormatter.cs @@ -1,108 +1,88 @@ -// Copyright 2024 The Drasi Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +  -using StorageQueueReaction.Models; using System.Text.Json; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.StorageQueue.Models.Unpacked; -namespace StorageQueueReaction.Services +namespace Drasi.Reactions.StorageQueue.Services { public class ChangeFormatter : IChangeFormatter { - public IEnumerable FormatAdd(string queryId, JsonElement.ArrayEnumerator input) + public IEnumerable Format(ChangeEvent evt) { var result = new List(); - foreach (var inputItem in input) + foreach (var inputItem in evt.AddedResults) { var outputItem = new ChangeNotification { - Op = "i", - TimestampMilliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - Schema = "", - Payload = new ChangePayload() + Op = ChangeNotificationOp.I, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() { - Source = new ChangeSource() + Source = new SourceClass() { - Db = "Drasi", - Table = queryId + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs }, - Before = null, After = inputItem } }; - result.Add(outputItem); } - return result; - } - - public IEnumerable FormatUpdate(string queryId, JsonElement.ArrayEnumerator input) - { - var result = new List(); - foreach (var inputItem in input) + foreach (var inputItem in evt.UpdatedResults) { var outputItem = new ChangeNotification { - Op = "u", - TimestampMilliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - Schema = "", - Payload = new ChangePayload() + Op = ChangeNotificationOp.U, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() { - Source = new ChangeSource() + Source = new SourceClass() { - Db = "Drasi", - Table = queryId + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs }, - Before = inputItem.GetProperty("before"), - After = inputItem.GetProperty("after") + Before = inputItem.Before, + After = inputItem.After } }; - result.Add(outputItem); } - return result; - } - - public IEnumerable FormatDelete(string queryId, JsonElement.ArrayEnumerator input) - { - var result = new List(); - foreach (var inputItem in input) + foreach (var inputItem in evt.DeletedResults) { var outputItem = new ChangeNotification { - Op = "d", - TimestampMilliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - Schema = "", - Payload = new ChangePayload() + Op = ChangeNotificationOp.D, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() { - Source = new ChangeSource() + Source = new SourceClass() { - Db = "Drasi", - Table = queryId + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs }, - Before = inputItem, - After = null + Before = inputItem } }; - result.Add(outputItem); } return result; - } - + } } } diff --git a/reactions/azure/storagequeue-reaction/Services/ChangeHandler.cs b/reactions/azure/storagequeue-reaction/Services/ChangeHandler.cs new file mode 100644 index 00000000..6706778c --- /dev/null +++ b/reactions/azure/storagequeue-reaction/Services/ChangeHandler.cs @@ -0,0 +1,67 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Drasi.Reactions.StorageQueue.Services; + +using System; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Drasi.Reaction.SDK; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.StorageQueue.Models.Unpacked; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +public class ChangeHandler : IChangeEventHandler +{ + private readonly QueueClient _queueClient; + private readonly OutputFormat _format; + private readonly IChangeFormatter _formatter; + private readonly ILogger _logger; + + public ChangeHandler(QueueClient queueClient, IConfiguration config, IChangeFormatter changeFormatter, ILogger logger) + { + _queueClient = queueClient; + _format = Enum.Parse(config.GetValue("format", "packed") ?? "packed", true); + _logger = logger; + _formatter = changeFormatter; + } + + public async Task HandleChange(ChangeEvent evt, object? queryConfig) + { + switch (_format) + { + case OutputFormat.Packed: + var resp = await _queueClient.SendMessageAsync(evt.ToJson()); + _logger.LogInformation($"Sent message to queue: {resp.Value.MessageId}"); + break; + case OutputFormat.Unpacked: + var notifications = _formatter.Format(evt); + foreach (var notification in notifications) + { + var dzresp = await _queueClient.SendMessageAsync(notification.ToJson()); + _logger.LogInformation($"Sent message to queue: {dzresp.Value.MessageId}"); + } + break; + default: + throw new NotSupportedException("Invalid output format"); + } + } +} + +enum OutputFormat +{ + Packed, + Unpacked +} \ No newline at end of file diff --git a/reactions/azure/storagequeue-reaction/Services/ControlSignalHandler.cs b/reactions/azure/storagequeue-reaction/Services/ControlSignalHandler.cs new file mode 100644 index 00000000..ef90ad4a --- /dev/null +++ b/reactions/azure/storagequeue-reaction/Services/ControlSignalHandler.cs @@ -0,0 +1,70 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Drasi.Reactions.StorageQueue.Services; + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Drasi.Reaction.SDK; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.StorageQueue.Models.Unpacked; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +public class ControlSignalHandler : IControlEventHandler +{ + private readonly QueueClient _queueClient; + private readonly OutputFormat _format; + private readonly ILogger _logger; + + public ControlSignalHandler(QueueClient queueClient, IConfiguration config, ILogger logger) + { + _queueClient = queueClient; + _format = Enum.Parse(config.GetValue("Format", "packed") ?? "packed", true); + _logger = logger; + } + + public async Task HandleControlSignal(ControlEvent evt, object? queryConfig) + { + switch (_format) + { + case OutputFormat.Packed: + var resp = await _queueClient.SendMessageAsync(evt.ToJson()); + _logger.LogInformation($"Sent message to queue: {resp.Value.MessageId}"); + break; + case OutputFormat.Unpacked: + var notification = new ControlSignalNotification + { + Op = ControlSignalNotificationOp.X, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new ControlSignalNotificationPayload() + { + Kind = JsonSerializer.Serialize(evt.ControlSignal.Kind, Reaction.SDK.Models.QueryOutput.ModelOptions.JsonOptions).Trim('"'), + Source = new SourceClass() + { + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs + }, + } + }; + var dzresp = await _queueClient.SendMessageAsync(notification.ToJson()); + _logger.LogInformation($"Sent message to queue: {dzresp.Value.MessageId}"); + break; + default: + throw new NotSupportedException("Invalid output format"); + } + } +} diff --git a/reactions/azure/storagequeue-reaction/Services/IChangeFormatter.cs b/reactions/azure/storagequeue-reaction/Services/IChangeFormatter.cs index 4dce8106..3bdb5ea8 100644 --- a/reactions/azure/storagequeue-reaction/Services/IChangeFormatter.cs +++ b/reactions/azure/storagequeue-reaction/Services/IChangeFormatter.cs @@ -12,16 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -using StorageQueueReaction.Models; -using Newtonsoft.Json.Linq; -using System.Text.Json; - -namespace StorageQueueReaction.Services -{ - public interface IChangeFormatter - { - IEnumerable FormatAdd(string queryId, JsonElement.ArrayEnumerator input); - IEnumerable FormatDelete(string queryId, JsonElement.ArrayEnumerator input); - IEnumerable FormatUpdate(string queryId, JsonElement.ArrayEnumerator input); - } -} +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.StorageQueue.Models.Unpacked; + +namespace Drasi.Reactions.StorageQueue.Services +{ + public interface IChangeFormatter + { + IEnumerable Format(ChangeEvent evt); + } +} diff --git a/reactions/azure/storagequeue-reaction/storagequeue-reaction.csproj b/reactions/azure/storagequeue-reaction/storagequeue-reaction.csproj index 1d86624b..39379086 100644 --- a/reactions/azure/storagequeue-reaction/storagequeue-reaction.csproj +++ b/reactions/azure/storagequeue-reaction/storagequeue-reaction.csproj @@ -1,22 +1,19 @@ - + - net6.0 + Exe + net8.0 enable enable - StorageQueueReaction + Drasi.Reactions.StorageQueue Linux . - - - - - - - - + + + + diff --git a/reactions/azure/storagequeue-reaction/storagequeue-reaction.sln b/reactions/azure/storagequeue-reaction/storagequeue-reaction.sln new file mode 100644 index 00000000..fa219f0d --- /dev/null +++ b/reactions/azure/storagequeue-reaction/storagequeue-reaction.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.002.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "storagequeue-reaction", "storagequeue-reaction.csproj", "{78FB5720-8464-43CB-B127-AFAF850D894B}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {78FB5720-8464-43CB-B127-AFAF850D894B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {78FB5720-8464-43CB-B127-AFAF850D894B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {78FB5720-8464-43CB-B127-AFAF850D894B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {78FB5720-8464-43CB-B127-AFAF850D894B}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {1B51A758-1F38-4966-AB17-39BDB917A4F9} + EndGlobalSection +EndGlobal diff --git a/typespec/output-unpacked/.gitignore b/typespec/output-unpacked/.gitignore new file mode 100644 index 00000000..94f0f0d9 --- /dev/null +++ b/typespec/output-unpacked/.gitignore @@ -0,0 +1 @@ +_generated \ No newline at end of file diff --git a/typespec/output-unpacked/main.tsp b/typespec/output-unpacked/main.tsp new file mode 100644 index 00000000..51af21f8 --- /dev/null +++ b/typespec/output-unpacked/main.tsp @@ -0,0 +1,63 @@ +import "@typespec/json-schema"; +import "@typespec/versioning"; + +using TypeSpec.JsonSchema; +using TypeSpec.Versioning; + +@jsonSchema +@versioned(Drasi.Unpacked.Versions) +namespace Drasi.Unpacked; + +@extension("title", "Versions") +enum Versions { + v1, +} + +@extension("title", "Op") +enum Op { + Insert: "i", + Update: "u", + Delete: "d", + ControlSignal: "x", +}; + +@discriminator("op") +@extension("title", "Notification") +model Notification { + "op": Op; + ts_ms: int64; + metadata?: Record +} + + +@extension("title", "ChangeNotification") +model ChangeNotification extends Notification { + "op": Op.Insert | Op.Update | Op.Delete; + payload: ChangePayload; +} + +@extension("title", "ControlSignalNotification") +model ControlSignalNotification extends Notification { + "op": Op.ControlSignal; + payload: ControlPayload; +} + +@extension("title", "ChangePayload") +model ChangePayload { + source: ChangeSource; + before?: Record; + after?: Record; +} + +@extension("title", "ControlPayload") +model ControlPayload { + source: ChangeSource; + kind: string; +} + +@extension("title", "ChangeSource") +model ChangeSource { + @doc("The ID of the query that the change originated from") + queryId: string; + ts_ms: int64; +} diff --git a/typespec/output-unpacked/tspconfig.yaml b/typespec/output-unpacked/tspconfig.yaml new file mode 100644 index 00000000..92f2cf81 --- /dev/null +++ b/typespec/output-unpacked/tspconfig.yaml @@ -0,0 +1,8 @@ +emit: + - "@typespec/json-schema" +options: + "@typespec/json-schema": + file-type: "yaml" + int64-strategy: "number" + +output-dir: "{project-root}/_generated" \ No newline at end of file