Skip to content

Commit

Permalink
temp changes; may need to debug eventuous pg projector (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikshafer authored Jun 23, 2024
1 parent 29402a4 commit e922a48
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 4 deletions.
18 changes: 18 additions & 0 deletions src/Catalog/Catalog.Api/Infrastructure/Postgres.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Npgsql;

namespace Catalog.Api.Infrastructure;

public static class Postgres
{
public static NpgsqlDataSource ConfigurePostgres(IConfiguration configuration)
{
var config = configuration.GetSection("Postgres").Get<PostgresSettings>();
return new NpgsqlDataSourceBuilder(config!.ConnectionString).Build();
}
}

public record PostgresSettings
{
public string ConnectionString { get; init; } = null!;
public string Schema { get; init; } = null!;
}
30 changes: 30 additions & 0 deletions src/Catalog/Catalog.Api/Queries/Products/ProductPgProjector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Catalog.Products;
using Eventuous.Postgresql.Projections;
using Npgsql;

namespace Catalog.Api.Queries.Products;

public class ProductPgProjector : PostgresProjector
{
public ProductPgProjector(NpgsqlDataSource dataSource) : base(dataSource)
{
const string insertSql =
"""
insert into catalog.product_drafts
(product_id, sku, brand_name, created_at)
values (@product_id, @sku, @brand_name, @created_at)
""";

On<ProductEvents.V1.ProductDrafted>(
(connection, ctx) =>
Project(
connection,
insertSql,
new NpgsqlParameter("@product_id", ctx.Stream.GetId()),
new NpgsqlParameter("@sku", ctx.Message.Sku),
new NpgsqlParameter("@brand_name", ctx.Message.Brand),
new NpgsqlParameter("@created_at", ctx.Message.CreatedAt)
)
);
}
}
16 changes: 12 additions & 4 deletions src/Catalog/Catalog.Api/Registrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using ThirdParty.BouncyCastle.Utilities.IO.Pem;

#pragma warning disable CS0618 // Type or member is obsolete

Expand All @@ -29,7 +30,6 @@ namespace Catalog.Api;
public static class Registrations
{
private const string OTelServiceName = "catalog";
private const string PostgresSchemaName = "catalog";

public static void AddEventuous(this IServiceCollection services, IConfiguration configuration)
{
Expand Down Expand Up @@ -58,14 +58,15 @@ public static void AddEventuous(this IServiceCollection services, IConfiguration
services.AddSingleton<Catalog.Offers.Services.IsUserAuthorized>(id => new ValueTask<bool>(true));

// event store related
services
.AddEventuousPostgres(configuration["Postgres:ConnectionString"]!, PostgresSchemaName)
.AddCheckpointStore<PostgresCheckpointStore>();

// subscriptions: checkpoint stores
services.AddSingleton(Mongo.ConfigureMongo(configuration));
services.AddCheckpointStore<MongoCheckpointStore>();

// services.AddEventuousPostgres(); // for PG ES
// services.AddSingleton(Postgres.ConfigurePostgres(configuration));
// services.AddCheckpointStore<PostgresCheckpointStore>();

// subscriptions: projections
services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"ProductsProjections",
Expand All @@ -88,6 +89,13 @@ public static void AddEventuous(this IServiceCollection services, IConfiguration
.AddEventHandler<OfferStateProjection>()
.WithPartitioningByStream(2));

// services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
// "ProductDraftsProjections",
// builder => builder
// .UseCheckpointStore<PostgresCheckpointStore>()
// .AddEventHandler<ProductPgProjector>()
// .WithPartitioningByStream(2));

// subscriptions: persistent subscriptions
// TODO: Add persistent subscription for integration points and other use cases

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table if not exists catalog.product_drafts
(
product_id varchar(256) not null primary key,
sku varchar(32) not null,
brand_name varchar(128),
created_at timestamp
);
1 change: 1 addition & 0 deletions src/Catalog/Catalog.Api/Scripts/create_schema_catalog.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE SCHEMA IF NOT EXISTS catalog;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE SCHEMA IF NOT EXISTS eventuous;
5 changes: 5 additions & 0 deletions src/Catalog/Catalog.Api/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,10 @@
"User": "mongoadmin",
"Password": "secret",
"Database": "Catalog"
},
"Postgres": {
"ConnectionString": "Server=127.0.0.1;Port=5432;Database=postgres;User Id=postgres;Password=Password123!;Include Error Detail=true;",
"Schema": "catalog",
"InitializeDatabase": true
}
}

0 comments on commit e922a48

Please sign in to comment.