diff --git a/src/Catalog/Catalog.Api/Infrastructure/Postgres.cs b/src/Catalog/Catalog.Api/Infrastructure/Postgres.cs new file mode 100644 index 0000000..4fb7244 --- /dev/null +++ b/src/Catalog/Catalog.Api/Infrastructure/Postgres.cs @@ -0,0 +1,18 @@ +using Npgsql; + +namespace Catalog.Api.Infrastructure; + +public static class Postgres +{ + public static NpgsqlDataSource ConfigurePostgres(IConfiguration configuration) + { + var config = configuration.GetSection("Postgres").Get(); + return new NpgsqlDataSourceBuilder(config!.ConnectionString).Build(); + } +} + +public record PostgresSettings +{ + public string ConnectionString { get; init; } = null!; + public string Schema { get; init; } = null!; +} diff --git a/src/Catalog/Catalog.Api/Queries/Products/ProductPgProjector.cs b/src/Catalog/Catalog.Api/Queries/Products/ProductPgProjector.cs new file mode 100644 index 0000000..6d9a915 --- /dev/null +++ b/src/Catalog/Catalog.Api/Queries/Products/ProductPgProjector.cs @@ -0,0 +1,30 @@ +using Catalog.Products; +using Eventuous.Postgresql.Projections; +using Npgsql; + +namespace Catalog.Api.Queries.Products; + +public class ProductPgProjector : PostgresProjector +{ + public ProductPgProjector(NpgsqlDataSource dataSource) : base(dataSource) + { + const string insertSql = + """ + insert into catalog.product_drafts + (product_id, sku, brand_name, created_at) + values (@product_id, @sku, @brand_name, @created_at) + """; + + On( + (connection, ctx) => + Project( + connection, + insertSql, + new NpgsqlParameter("@product_id", ctx.Stream.GetId()), + new NpgsqlParameter("@sku", ctx.Message.Sku), + new NpgsqlParameter("@brand_name", ctx.Message.Brand), + new NpgsqlParameter("@created_at", ctx.Message.CreatedAt) + ) + ); + } +} diff --git a/src/Catalog/Catalog.Api/Registrations.cs b/src/Catalog/Catalog.Api/Registrations.cs index 38a3b14..a98e7cf 100644 --- a/src/Catalog/Catalog.Api/Registrations.cs +++ b/src/Catalog/Catalog.Api/Registrations.cs @@ -21,6 +21,7 @@ using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; +using ThirdParty.BouncyCastle.Utilities.IO.Pem; #pragma warning disable CS0618 // Type or member is obsolete @@ -29,7 +30,6 @@ namespace Catalog.Api; public static class Registrations { private const string OTelServiceName = "catalog"; - private const string PostgresSchemaName = "catalog"; public static void AddEventuous(this IServiceCollection services, IConfiguration configuration) { @@ -58,14 +58,15 @@ public static void AddEventuous(this IServiceCollection services, IConfiguration services.AddSingleton(id => new ValueTask(true)); // event store related - services - .AddEventuousPostgres(configuration["Postgres:ConnectionString"]!, PostgresSchemaName) - .AddCheckpointStore(); // subscriptions: checkpoint stores services.AddSingleton(Mongo.ConfigureMongo(configuration)); services.AddCheckpointStore(); + // services.AddEventuousPostgres(); // for PG ES + // services.AddSingleton(Postgres.ConfigurePostgres(configuration)); + // services.AddCheckpointStore(); + // subscriptions: projections services.AddSubscription( "ProductsProjections", @@ -88,6 +89,13 @@ public static void AddEventuous(this IServiceCollection services, IConfiguration .AddEventHandler() .WithPartitioningByStream(2)); + // services.AddSubscription( + // "ProductDraftsProjections", + // builder => builder + // .UseCheckpointStore() + // .AddEventHandler() + // .WithPartitioningByStream(2)); + // subscriptions: persistent subscriptions // TODO: Add persistent subscription for integration points and other use cases diff --git a/src/Catalog/Catalog.Api/Scripts/create_catalog_product_drafts.sql b/src/Catalog/Catalog.Api/Scripts/create_catalog_product_drafts.sql new file mode 100644 index 0000000..805c73d --- /dev/null +++ b/src/Catalog/Catalog.Api/Scripts/create_catalog_product_drafts.sql @@ -0,0 +1,7 @@ +create table if not exists catalog.product_drafts +( + product_id varchar(256) not null primary key, + sku varchar(32) not null, + brand_name varchar(128), + created_at timestamp +); diff --git a/src/Catalog/Catalog.Api/Scripts/create_schema_catalog.sql b/src/Catalog/Catalog.Api/Scripts/create_schema_catalog.sql new file mode 100644 index 0000000..6a3a570 --- /dev/null +++ b/src/Catalog/Catalog.Api/Scripts/create_schema_catalog.sql @@ -0,0 +1 @@ +CREATE SCHEMA IF NOT EXISTS catalog; diff --git a/src/Catalog/Catalog.Api/Scripts/create_schema_eventuous.sql b/src/Catalog/Catalog.Api/Scripts/create_schema_eventuous.sql new file mode 100644 index 0000000..b808856 --- /dev/null +++ b/src/Catalog/Catalog.Api/Scripts/create_schema_eventuous.sql @@ -0,0 +1 @@ +CREATE SCHEMA IF NOT EXISTS eventuous; diff --git a/src/Catalog/Catalog.Api/appsettings.Development.json b/src/Catalog/Catalog.Api/appsettings.Development.json index a63d74d..2074190 100644 --- a/src/Catalog/Catalog.Api/appsettings.Development.json +++ b/src/Catalog/Catalog.Api/appsettings.Development.json @@ -17,5 +17,10 @@ "User": "mongoadmin", "Password": "secret", "Database": "Catalog" + }, + "Postgres": { + "ConnectionString": "Server=127.0.0.1;Port=5432;Database=postgres;User Id=postgres;Password=Password123!;Include Error Detail=true;", + "Schema": "catalog", + "InitializeDatabase": true } }