Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EES-5826 - use channels to separate Public API query response from the writing of queries for analytics #5678

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
EES-5826 - reponding to PR comments. Using Channels to queue up reque…
…sts for writing queries for analytics, rather than not awaiting the writing process.
  • Loading branch information
duncan-at-hiveit committed Mar 11, 2025
commit f2a8898e31f2cac04477bc01d563481ecb1dbf11

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services;
using GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services.Interfaces;
using Microsoft.Extensions.Logging;
using Moq;
using Snapshooter.Xunit;

namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Tests.Services;

public abstract class QueryAnalyticsWriterTests
{
public class ReportDataSetVersionQueryTests : QueryAnalyticsWriterTests
{
private const string SnapshotPrefix = $"{nameof(QueryAnalyticsWriterTests)}.{nameof(ReportDataSetVersionQueryTests)}";

[Fact]
public async Task Success()
{
var pathResolver = new TestAnalyticsPathResolver();
var service = BuildService(pathResolver);

await service.ReportDataSetVersionQuery(new CaptureDataSetVersionQueryRequest(
DataSetId: new Guid("acb97c97-89c9-4b74-88e7-39c27f6bab63"),
DataSetVersionId: new Guid("0da7c640-80a8-44e2-8028-fc529bcedcb1"),
DataSetVersion: "2.3.1",
DataSetTitle: "Data Set Title",
Query: DataSetQueryRequestTestData.NestedQuery1,
ResultsCount: 55,
TotalRowsCount: 5100,
StartTime: DateTime.Parse("2025-02-20T12:00:00.000Z"),
EndTime: DateTime.Parse("2025-02-20T12:00:10.234Z")));

await service.ReportDataSetVersionQuery(new CaptureDataSetVersionQueryRequest(
DataSetId: new Guid("72e16c8c-2dc0-4063-bdf6-ee52bd127ebe"),
DataSetVersionId: new Guid("bb68fd95-1231-498c-8858-b061d739ae17"),
DataSetVersion: "3.0.0",
DataSetTitle: "Data Set Title 2",
Query: DataSetQueryRequestTestData.NestedQuery2,
ResultsCount: 120,
TotalRowsCount: 10000,
StartTime: DateTime.Parse("2025-02-20T01:00:00.000Z"),
EndTime: DateTime.Parse("2025-02-20T01:00:10.999Z")));

var files = Directory
.GetFiles(pathResolver.PublicApiQueriesDirectoryPath())
.Order()
.ToList();

Assert.Equal(2, files.Count);

var file1Contents = await File.ReadAllTextAsync(files[0]);
Snapshot.Match(
currentResult: file1Contents,
snapshotName: $"{SnapshotPrefix}.{nameof(Success)}.Query1");

var file2Contents = await File.ReadAllTextAsync(files[1]);
Snapshot.Match(
currentResult: file2Contents,
snapshotName: $"{SnapshotPrefix}.{nameof(Success)}.Query2");
}

private static QueryAnalyticsWriter BuildService(IAnalyticsPathResolver pathResolver)
{
return new(pathResolver, Mock.Of<ILogger<QueryAnalyticsWriter>>());
}
}
}
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ internal class DataSetQueryService(
IParquetIndicatorRepository indicatorRepository,
IParquetLocationRepository locationRepository,
IParquetTimePeriodRepository timePeriodRepository,
IAnalyticsService analyticsService)
IQueryAnalyticsChannel queryAnalyticsChannel)
: IDataSetQueryService
{
private static readonly Dictionary<string, string> ReservedSorts = new()
@@ -149,21 +149,18 @@ private async Task<Either<ActionResult, DataSetQueryPaginatedResultsViewModel>>
query: query,
cancellationToken: cancellationToken,
baseCriteriaPath: baseCriteriaPath)
.OnSuccessDo(results =>
{
// Deliberately do not await this operation as we do not want to
// delay the return of the query to the end user.
_ = analyticsService.ReportDataSetVersionQuery(
dataSetId: dataSetVersion.DataSetId,
dataSetVersionId: dataSetVersion.Id,
semVersion: dataSetVersion.SemVersion().ToString(),
dataSetTitle: dataSetVersion.DataSet.Title,
query: query,
resultsCount: results.Results.Count,
totalRowsCount: results.Paging.TotalResults,
startTime: startTime,
endTime: DateTime.UtcNow);
});
.OnSuccessDo(results => queryAnalyticsChannel.WriteQuery(
request: new CaptureDataSetVersionQueryRequest(
DataSetId: dataSetVersion.DataSetId,
DataSetVersionId: dataSetVersion.Id,
DataSetVersion: dataSetVersion.SemVersion().ToString(),
DataSetTitle: dataSetVersion.DataSet.Title,
Query: query,
ResultsCount: results.Results.Count,
TotalRowsCount: results.Paging.TotalResults,
StartTime: startTime,
EndTime: DateTime.UtcNow),
cancellationToken: cancellationToken));
}

private async Task<Either<ActionResult, DataSetQueryPaginatedResultsViewModel>> RunQuery(

This file was deleted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something for you to change - but just whilst I have your attention :-)
It would be usual for a there to be two interfaces, one as the "Reporter" where components can send their data, and another as a "Consumer" that would take the data out and process it. This cleans up the intentions of components and prevents any possibility of anything else reading from the channel when they should only ever be writing. The container of course resolves to the same instance.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services.Interfaces;

public interface IQueryAnalyticsChannel
{
Task WriteQuery(CaptureDataSetVersionQueryRequest request, CancellationToken cancellationToken);

ValueTask<CaptureDataSetVersionQueryRequest> ReadQuery(CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services.Interfaces;

public interface IQueryAnalyticsWriter
{
Task ReportDataSetVersionQuery(CaptureDataSetVersionQueryRequest request);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you think it provides added clarity, I wouldn't put Channel in the name since that is an implementation detail. You can think if this more as the QueryAnalyticsReporter or Manager or Handler? ie it's just the place where all other components can go to send/report their analytics.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Threading.Channels;
using GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services.Interfaces;

namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services;

public class QueryAnalyticsChannel : IQueryAnalyticsChannel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests for this class?

{
private readonly Channel<CaptureDataSetVersionQueryRequest> _channel =
Channel.CreateUnbounded<CaptureDataSetVersionQueryRequest>();

public async Task WriteQuery(CaptureDataSetVersionQueryRequest request, CancellationToken cancellationToken)
{
await _channel.Writer.WriteAsync(request, cancellationToken);
}

public ValueTask<CaptureDataSetVersionQueryRequest> ReadQuery(CancellationToken cancellationToken)
{
return _channel.Reader.ReadAsync(cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Channels;
using GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services.Interfaces;

namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services;

public class QueryAnalyticsConsumer(
IQueryAnalyticsChannel channel,
IQueryAnalyticsWriter queryAnalyticsWriter
) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be unit tested?

{
while (!stoppingToken.IsCancellationRequested)
{
var message = await channel.ReadQuery(stoppingToken);
await queryAnalyticsWriter.ReportDataSetVersionQuery(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this throws, I'm guessing this background service will stop writing the files? Probably want to guard against that - and log errors?

}
}
}
Original file line number Diff line number Diff line change
@@ -7,38 +7,21 @@

namespace GovUk.Education.ExploreEducationStatistics.Public.Data.Api.Services;

public class AnalyticsService(
public class QueryAnalyticsWriter(
IAnalyticsPathResolver analyticsPathResolver,
ILogger<AnalyticsService> logger) : IAnalyticsService
ILogger<QueryAnalyticsWriter> logger) : IQueryAnalyticsWriter
{
public async Task ReportDataSetVersionQuery(
Guid dataSetId,
Guid dataSetVersionId,
string semVersion,
string dataSetTitle,
DataSetQueryRequest query,
int resultsCount,
int totalRowsCount,
DateTime startTime,
DateTime endTime)
public async Task ReportDataSetVersionQuery(CaptureDataSetVersionQueryRequest request)
{
logger.LogInformation(
"Capturing query for analytics for data set {DataSetTitle}",
dataSetTitle);

var request = new CaptureDataSetVersionQueryRequest(
DataSetId: dataSetId,
DataSetVersionId: dataSetVersionId,
DataSetVersion: semVersion,
DataSetTitle: dataSetTitle,
ResultsCount: resultsCount,
TotalRowsCount: totalRowsCount,
StartTime: startTime,
EndTime: endTime,
Query: DataSetQueryNormalisationUtil.NormaliseQuery(query));
request.DataSetTitle);

var serialisedRequest = JsonSerializationUtils.Serialize(
obj: request,
obj: request with
{
Query = DataSetQueryNormalisationUtil.NormaliseQuery(request.Query)
},
formatting: Formatting.Indented,
orderedProperties: true,
camelCase: true);
@@ -47,15 +30,15 @@ public async Task ReportDataSetVersionQuery(

Directory.CreateDirectory(directory);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would catch and log errors here - permissions, disk full, all sorts of things that could go wrong.


var filename = $"{DateTime.UtcNow:yyyyMMdd-HHmmss-fff}_{dataSetVersionId}.json";
var filename = $"{DateTime.UtcNow:yyyyMMdd-HHmmss-fff}_{request.DataSetVersionId}.json";

await File.WriteAllTextAsync(
Path.Combine(directory, filename),
contents: serialisedRequest);
}
}

internal record CaptureDataSetVersionQueryRequest(
public record CaptureDataSetVersionQueryRequest(
Guid DataSetId,
Guid DataSetVersionId,
string DataSetVersion,
Original file line number Diff line number Diff line change
@@ -256,12 +256,14 @@ public void ConfigureServices(IServiceCollection services)

if (_analyticsOptions.Enabled)
{
services.AddScoped<IAnalyticsService, AnalyticsService>();
services.AddSingleton<IQueryAnalyticsChannel, QueryAnalyticsChannel>();
services.AddHostedService<QueryAnalyticsConsumer>();
services.AddSingleton<IAnalyticsPathResolver, AnalyticsPathResolver>();
services.AddSingleton<IQueryAnalyticsWriter, QueryAnalyticsWriter>();
}
else
{
services.AddScoped<IAnalyticsService, NoOpAnalyticsService>();
services.AddSingleton<IQueryAnalyticsChannel, NoopQueryAnalyticsChannel>();
}
}

@@ -373,13 +375,16 @@ private static void ApplyCustomMigrations(IApplicationBuilder app)
migrations.ForEach(migration => migration.Apply());
}

private class NoOpAnalyticsService : IAnalyticsService
private class NoopQueryAnalyticsChannel : IQueryAnalyticsChannel
{
public Task ReportDataSetVersionQuery(Guid dataSetId, Guid dataSetVersionId, string semVersion,
string dataSetTitle,
DataSetQueryRequest query, int resultsCount, int totalRowsCount, DateTime startTime, DateTime endTime)
public Task WriteQuery(CaptureDataSetVersionQueryRequest request, CancellationToken cancellationToken)
{
return Task.CompletedTask;
return Task.CompletedTask;
}

public ValueTask<CaptureDataSetVersionQueryRequest> ReadQuery(CancellationToken cancellationToken)
{
return default;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this potentially lead to a spinning process? If something was consuming it, it would be pouring out a stream of values? Perhaps add :
await Task.Delay(Timeout.Infinite, cancellationToken);

}
}
}