Skip to content

Commit

Permalink
Combine OTel and Sweeper into Examples (#3245)
Browse files Browse the repository at this point in the history
* feature: add the tracer to the message pump

* feature: use tracer to begin span for pump

* fix: ensure we log on a quit

* fix: remove warnings

* feature: configure the instrumentation options of the Dispatcher

* fix: not showing telemetry

* fix: error paths must end spans, use status to indicate error; non-error no message paths must end spans too.

* fix: the quit and none messages have no topic, but the OTel standards needs them to...so we pass the routing key when creating them.

* fix: internalbus not empty, has spurious MT_NONE

* fix: internal bus was locking empty message returned when bus empty

* fix: move of SubscriptonName requires serialization options

* fix: ensure that you can serialize our types

* fix: use message type to find correct span

* feature: check for spans on empty and quit

* feature: tests for span for MT_UNACCEPTABLE message

* chore: should use channel name not string primitive

* feature: support errors being reported with the routing key; create a span for the whole pump

* chore: remove warnings
  • Loading branch information
iancooper authored Sep 2, 2024
1 parent 726edc7 commit 6ce26a8
Show file tree
Hide file tree
Showing 113 changed files with 1,807 additions and 408 deletions.
Binary file modified samples/WebAPI/WebAPI_Dapper/GreetingsWeb/Greetings.db
Binary file not shown.
27 changes: 20 additions & 7 deletions samples/WebAPI/WebAPI_Dapper/GreetingsWeb/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using DbMaker;
using GreetingsApp.Handlers;
using GreetingsApp.Policies;
Expand All @@ -9,20 +8,20 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.OpenApi.Models;
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Paramore.Brighter;
using Paramore.Brighter.Extensions.DependencyInjection;
using Paramore.Brighter.Observability;
using Paramore.Brighter.Extensions.Diagnostics;
using Paramore.Darker.AspNetCore;
using Paramore.Darker.Policies;
using Paramore.Darker.QueryLogging;
using TransportMaker;
using ExportProcessorType = OpenTelemetry.ExportProcessorType;

namespace GreetingsWeb;

Expand Down Expand Up @@ -132,8 +131,20 @@ private void ConfigureDarker(IServiceCollection services)

private void ConfigureObservability(IServiceCollection services)
{
var brighterTracer = new BrighterTracer(TimeProvider.System);
services.AddSingleton<IAmABrighterTracer>(brighterTracer);
services.AddLogging(loggingBuilder =>
{
loggingBuilder.AddConsole();
loggingBuilder.AddOpenTelemetry(options =>
{
options.IncludeScopes = true;
options.AddOtlpExporter((exporterOptions, _) =>
{
exporterOptions.Protocol = OtlpExportProtocol.Grpc;
})
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("GreetingsWeb"))
.IncludeScopes = true;
});
});

services.AddOpenTelemetry()
.ConfigureResource(builder =>
Expand All @@ -145,7 +156,7 @@ private void ConfigureObservability(IServiceCollection services)
}).WithTracing(builder =>
{
builder
.AddSource(brighterTracer.ActivitySource.Name)
.AddBrighterInstrumentation()
.AddSource("RabbitMQ.Client.*")
.SetSampler(new AlwaysOnSampler())
.AddAspNetCoreInstrumentation()
Expand All @@ -161,4 +172,6 @@ private void ConfigureObservability(IServiceCollection services)
.AddOtlpExporter()
);
}


}
13 changes: 12 additions & 1 deletion samples/WebAPI/WebAPI_Dapper/Greetings_Sweeper/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using GreetingsApp.Requests;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using OpenTelemetry.Exporter;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
Expand All @@ -21,7 +22,17 @@
builder.Services.AddSingleton<IAmABrighterTracer>(brighterTracer);

builder.Logging.ClearProviders();
builder.Logging.AddConsole().AddDebug();
builder.Logging.AddConsole();
builder.Logging.AddOpenTelemetry(otel =>
{
otel.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("Greetings Sweeper"))
.AddOtlpExporter(options =>
{
options.Protocol = OtlpExportProtocol.Grpc;
})
.IncludeScopes = true;
});

builder.Services.AddOpenTelemetry()
.ConfigureResource(builder =>
{
Expand Down
8 changes: 6 additions & 2 deletions samples/WebAPI/WebAPI_Dapper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ We also add an Inbox here. The Inbox can be used to de-duplicate messages. In me

## Telemetry

The apps use OpenTelemetry to provide observability. This is configured in the `Program.cs` or `Startup.cs` file. Docker files are supported for an Open Telemetry Collector that exports telemetry to Jaeger and metrics to Prometheus. There is a supported config file for the Open Telemetry collector at the root.
The apps use OpenTelemetry to provide observability. This is configured in the `Program.cs` or `Startup.cs` file.

You can view the telemetry in the [Jaeger UI](http://localhost:16686)
Docker files are supported for an Open Telemetry Collector that exports telemetry to Jaeger and metrics to Prometheus. There is a supported config file for the Open Telemetry collector at the root. You can view the telemetry in the [Jaeger UI](http://localhost:16686)

Alternatively you can use [.NET Aspire](https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/overview?tabs=bash) dashbaord to view the telemetry, which removes the need for running the collector and Jaeger. You can run it using Docker with:

`docker run --rm -it -p 18888:18888 -p 4317:18889 -d --name aspire-dashboard -e DOTNET_DASHBOARD_UNSECURED_ALLOW_ANONYMOUS='true' mcr.microsoft.com/dotnet/aspire-dashboard:8.0.0`

### Configuration

Expand Down
20 changes: 17 additions & 3 deletions samples/WebAPI/WebAPI_Dapper/SalutationAnalytics/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Exporter;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Paramore.Brighter;
using Paramore.Brighter.Extensions.DependencyInjection;
using Paramore.Brighter.Extensions.Diagnostics;
using Paramore.Brighter.MsSql;
using Paramore.Brighter.MySql;
using Paramore.Brighter.Observability;
Expand Down Expand Up @@ -139,8 +141,20 @@ static void ConfigureBrighter(HostBuilderContext hostContext, IServiceCollection

static void ConfigureObservability(IServiceCollection services)
{
var brighterTracer = new BrighterTracer(TimeProvider.System);
services.AddSingleton<IAmABrighterTracer>(brighterTracer);
services.AddLogging(loggingBuilder =>
{
loggingBuilder.AddConsole();
loggingBuilder.AddOpenTelemetry(options =>
{
options.IncludeScopes = true;
options.AddOtlpExporter((exporterOptions, processorOptions) =>
{
exporterOptions.Protocol = OtlpExportProtocol.Grpc;
})
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("Salutation Analytics"))
.IncludeScopes = true;
});
});

services.AddOpenTelemetry()
.ConfigureResource(builder =>
Expand All @@ -152,7 +166,7 @@ static void ConfigureObservability(IServiceCollection services)
}).WithTracing(builder =>
{
builder
.AddSource(brighterTracer.ActivitySource.Name)
.AddBrighterInstrumentation()
.AddSource("RabbitMQ.Client.*")
.SetSampler(new AlwaysOnSampler())
.AddAspNetCoreInstrumentation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"profiles": {
"Development": {
"commandName": "Project",
"dotnetRunMessages": "true",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"BRIGHTER_GREETINGS_DATABASE": "Sqlite",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Paramore.Brighter.Extensions.Diagnostics\Paramore.Brighter.Extensions.Diagnostics.csproj" />
<ProjectReference Include="..\..\..\..\src\Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection\Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection.csproj" />
<ProjectReference Include="..\..\..\..\src\Paramore.Brighter.ServiceActivator.Extensions.Hosting\Paramore.Brighter.ServiceActivator.Extensions.Hosting.csproj" />
<ProjectReference Include="..\..\..\..\src\Paramore.Brighter.ServiceActivator\Paramore.Brighter.ServiceActivator.csproj"/>
Expand Down
12 changes: 11 additions & 1 deletion samples/WebAPI/WebAPI_Dapper/Salutation_Sweeper/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using SalutationApp.Requests;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using OpenTelemetry.Exporter;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
Expand All @@ -20,7 +21,16 @@
builder.Services.AddSingleton<IAmABrighterTracer>(brighterTracer);

builder.Logging.ClearProviders();
builder.Logging.AddConsole().AddDebug();
builder.Logging.AddConsole();
builder.Logging.AddOpenTelemetry(otel =>
{
otel.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("Greetings Sweeper"))
.AddOtlpExporter(options =>
{
options.Protocol = OtlpExportProtocol.Grpc;
})
.IncludeScopes = true;
});
builder.Services.AddOpenTelemetry()
.ConfigureResource(builder =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public static IBrighterBuilder AddBrighter(
var options = new BrighterOptions();
configure?.Invoke(options);
services.TryAddSingleton<IBrighterOptions>(options);
services.TryAddSingleton<IAmABrighterTracer>(new BrighterTracer());

return BrighterHandlerBuilder(services, options);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
using OpenTelemetry.Trace;
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using OpenTelemetry.Trace;
using Paramore.Brighter.Observability;

namespace Paramore.Brighter.Extensions.Diagnostics;

public static class BrighterTracerBuilderExtensions
{
public static TracerProviderBuilder AddBrighterInstrumentation(this TracerProviderBuilder builder)
=> builder.AddSource("Paramore.Brighter");
{
builder.ConfigureServices(services =>
{
var brighterTracer = new BrighterTracer(TimeProvider.System);
services.TryAddSingleton<IAmABrighterTracer>(brighterTracer);
builder.AddSource(brighterTracer.ActivitySource.Name);
});

return builder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public IAmAChannel CreateChannel(Subscription subscription)
EnsureQueue();
return new Channel(
subscription.ChannelName.ToValidSQSQueueName(),
subscription.ChannelName.ToValidSQSQueueName(),
subscription.RoutingKey.ToValidSNSTopicName(),
_messageConsumerFactory.Create(subscription),
subscription.BufferSize
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public IAmAChannel CreateChannel(Subscription subscription)
_azureServiceBusConsumerFactory.Create(azureServiceBusSubscription);

return new Channel(
channelName: subscription.ChannelName,
channelName: subscription.ChannelName,
routingKey: subscription.RoutingKey,
messageConsumer: messageConsumer,
maxQueueLength: subscription.BufferSize
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public IAmAChannel CreateChannel(Subscription subscription)

return new Channel(
subscription.ChannelName,
subscription.RoutingKey,
_kafkaMessageConsumerFactory.Create(subscription),
subscription.BufferSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public IAmAChannel CreateChannel(Subscription subscription)

s_logger.LogDebug("MsSqlInputChannelFactory: create input channel {ChannelName} for topic {Topic}", subscription.ChannelName, subscription.RoutingKey);
return new Channel(
subscription.ChannelName,
subscription.ChannelName,
subscription.RoutingKey,
_msSqlMessageConsumerFactory.Create(subscription),
subscription.BufferSize);
}
Expand Down
7 changes: 4 additions & 3 deletions src/Paramore.Brighter.MessagingGateway.RMQ/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ public IAmAChannel CreateChannel(Subscription subscription)
var messageConsumer = _messageConsumerFactory.Create(rmqSubscription);

return new Channel(
channelName:subscription.ChannelName,
messageConsumer:messageConsumer,
maxQueueLength:subscription.BufferSize
channelName: subscription.ChannelName,
routingKey: subscription.RoutingKey,
messageConsumer: messageConsumer,
maxQueueLength: subscription.BufferSize
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public IAmAChannel CreateChannel(Subscription subscription)

return new Channel(
subscription.ChannelName,
subscription.RoutingKey,
_messageConsumerFactory.Create(subscription),
subscription.BufferSize
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public static IBrighterBuilder AddServiceActivator(
configure?.Invoke(options);
services.TryAddSingleton<IBrighterOptions>(options);
services.TryAddSingleton<IServiceActivatorOptions>(options);
services.TryAddSingleton<BrighterTracer>();

services.TryAdd(new ServiceDescriptor(typeof(IDispatcher),
(serviceProvider) => (IDispatcher)BuildDispatcher(serviceProvider),
Expand Down Expand Up @@ -70,17 +69,21 @@ private static Dispatcher BuildDispatcher(IServiceProvider serviceProvider)
var requestContextFactory = serviceProvider.GetService<IAmARequestContextFactory>();

var dispatcherBuilder = DispatchBuilder
.With()
.StartNew()
.CommandProcessorFactory(providerFactory, requestContextFactory);

var messageMapperRegistry = ServiceCollectionExtensions.MessageMapperRegistry(serviceProvider);
var messageTransformFactory = ServiceCollectionExtensions.TransformFactory(serviceProvider);
var messageTransformFactoryAsync = ServiceCollectionExtensions.TransformFactoryAsync(serviceProvider);

var tracer = serviceProvider.GetService<IAmABrighterTracer>();

return dispatcherBuilder
.MessageMappers(messageMapperRegistry, messageMapperRegistry, messageTransformFactory, messageTransformFactoryAsync)
.ChannelFactory(options.DefaultChannelFactory)
.Subscriptions(options.Subscriptions).Build();
.Subscriptions(options.Subscriptions)
.ConfigureInstrumentation(tracer, options.InstrumentationOptions)
.Build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public BrighterServiceActivatorHealthCheck(IDispatcher dispatcher)

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new())
{
var expectedConsumers = ((Dispatcher)_dispatcher).Connections.Sum(c => c.NoOfPerformers);
var expectedConsumers = ((Dispatcher)_dispatcher).Subscriptions.Sum(c => c.NoOfPerformers);
var activeConsumers = _dispatcher.Consumers.Count();

if (expectedConsumers != activeConsumers)
Expand All @@ -33,12 +33,12 @@ public BrighterServiceActivatorHealthCheck(IDispatcher dispatcher)

private string GenerateUnhealthyMessage()
{
var config = ((Dispatcher)_dispatcher).Connections;
var config = ((Dispatcher)_dispatcher).Subscriptions;

var unhealthyHosts = new List<string>();
foreach (var cfg in config)
{
var sub = _dispatcher.Consumers.Where(c => c.SubscriptionName == cfg.Name).ToArray();
var sub = _dispatcher.Consumers.Where(c => c.Subscription.Name == cfg.Name).ToArray();
if (sub.Length != cfg?.NoOfPerformers)
unhealthyHosts.Add($"{cfg.Name} has {sub.Count()} of {cfg.NoOfPerformers} expected consumers");
}
Expand Down
Loading

0 comments on commit 6ce26a8

Please sign in to comment.