Skip to content

Commit

Permalink
move activity construction to RedisStreamsService
Browse files Browse the repository at this point in the history
  • Loading branch information
mcsdodo committed Jul 30, 2024
1 parent 8b2cc53 commit 64ea7f3
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 17 deletions.
1 change: 1 addition & 0 deletions Common.Redis/Common.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" Version="1.9.0" />
<PackageReference Include="StackExchange.Redis" Version="2.8.0" />
</ItemGroup>

Expand Down
47 changes: 43 additions & 4 deletions Common.Redis/RedisStreamsService.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text.Json;
using System.Text.RegularExpressions;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using StackExchange.Redis;

namespace Common.Redis;

public class RedisStreamsService : IRedisStreamsService
{
private const string ValueRedisEntryName = "value";
private const string CreatedAtRedisEntryName = "createdAtDateTimeOffset";
private const string MessageEntryKey = "value";
private const string TelemetryContextEntryKey = "ctx";
private const string CreatedAtEntryKey = "createdAtDateTimeOffset";
private const int DefaultMaxLength = 10_000;

private readonly TimeProvider _dateTimeProvider;
Expand All @@ -28,18 +33,31 @@ public RedisStreamsService(
_database = connectionMultiplexer.Value.GetDatabase();
}

private record RedisStreamActivityContext
{
public string Context { get; set; }
}

public async Task<string?> StreamAddAsync(string streamName, string message, int? streamLength = DefaultMaxLength)
{
try
{
var utcNow = _dateTimeProvider.GetUtcNow().ToString();

using var activity = new ActivitySource("Redis.Producer").StartActivity("redis-stream-write", ActivityKind.Producer);
var carrier = new RedisStreamActivityContext();
Propagators.DefaultTextMapPropagator.Inject(
new PropagationContext(activity!.Context, Baggage.Current),
carrier,
(context, _, value) => context.Context = value);

return await _database.StreamAddAsync(
streamName,
streamPairs:
[
new NameValueEntry(ValueRedisEntryName, message),
new NameValueEntry(CreatedAtRedisEntryName, utcNow)
new NameValueEntry(MessageEntryKey, message),
new NameValueEntry(TelemetryContextEntryKey, JsonSerializer.Serialize(carrier)),
new NameValueEntry(CreatedAtEntryKey, utcNow)
],
maxLength: streamLength,
useApproximateMaxLength: true);
Expand All @@ -63,6 +81,22 @@ public RedisStreamsService(
}
}

private bool TryGetContext(StreamEntry streamEntry, out PropagationContext context)
{
var streamEntryDictionary =
streamEntry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());

if (streamEntryDictionary.TryGetValue(TelemetryContextEntryKey, out var serializedCtx))
{
var ctx = JsonSerializer.Deserialize<RedisStreamActivityContext>(serializedCtx);
context = Propagators.DefaultTextMapPropagator.Extract(default, ctx,
(entry, _) => [entry!.Context]);
return true;
}

return false;
}

public async Task<ICollection<(string streamEntryId, string message)>> StreamReadGroupAsync(
string streamName,
string consumerGroupName,
Expand All @@ -87,7 +121,12 @@ public RedisStreamsService(
if (!TryGetMessage(streamEntry, out var message) || message is null)
continue;

TryGetContext(streamEntry, out var parentContext);
Baggage.Current = parentContext.Baggage;
using var activity = new ActivitySource("Redis.Consumer").StartActivity("redis-stream-read",
ActivityKind.Consumer, parentContext.ActivityContext);
messages.Add(new(streamEntry.Id!, message));
await StreamAcknowledgeAsync(streamName, consumerGroupName, streamEntry.Id!);
}

return messages;
Expand Down
4 changes: 1 addition & 3 deletions KafkaWriter.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void CacheRedisMessage(DateTime now, IRedisCacheService redisCacheService, ILogg
Content = $@"It's {now}",
CreatedOn = now
};
logger.LogInformation($"Producing to redis stream {message.Content}");
logger.LogInformation($"Creating redis cache entry {message.Content}");
using var activity = activitySource.StartActivity("redis-set", ActivityKind.Producer);
AddActivityToMessage(activity, message);
var serializedMessage = JsonSerializer.Serialize(message);
Expand All @@ -118,8 +118,6 @@ async Task StreamRedisMessage(DateTime now, IRedisStreamsService redisStreamsSer
CreatedOn = now
};
logger.LogInformation($"Producing to redis stream {message.Content}");
using var activity = activitySource.StartActivity("redis-stream-write", ActivityKind.Producer);
AddActivityToMessage(activity, message);
var serializedMessage = JsonSerializer.Serialize(message);
await redisStreamsService.StreamAddAsync(options.Value.Redis.StreamName, serializedMessage, 1);
}
Expand Down
8 changes: 0 additions & 8 deletions RedisStreamReader.Worker/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Diagnostics;
using System.Text.Json;
using Common.Redis;
using Microsoft.Extensions.Options;
Expand All @@ -12,7 +11,6 @@ public class Worker : BackgroundService
private readonly IRedisStreamsService _streamsService;
private readonly ILogger<Worker> _logger;
private readonly Connections _options;
private readonly ActivitySource _activitySource = new("Redis.Consumer");

public Worker(
IRedisStreamsService streamsService,
Expand Down Expand Up @@ -45,12 +43,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
continue;
}

// Extract context from message and start activity using it
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, message, (message, key) => [message.PropagationContext]);
Baggage.Current = parentContext.Baggage;

// Start the activity earlier and set the context when we have access to it? ¯\_(ツ)_/¯
using var activity = _activitySource.StartActivity("redis-stream-read", ActivityKind.Consumer, parentContext.ActivityContext);
await _streamsService.StreamAcknowledgeAsync(_options.Redis.StreamName,
_options.Redis.ConsumerGroupName, streamEntry.streamEntryId);
}
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ services:
Connections__Redis__ConnectionString: 'redis:6379,abortConnect=false'
OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317
depends_on: [ kafka, redis ]
# ports:
# - "5052:8080"
ports:
- "5052:8080"

kafkareader.worker:
image: kafkareader.worker
Expand Down

0 comments on commit 64ea7f3

Please sign in to comment.