Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return bad request on too many surrogate ids #3838

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Azure;
using EnsureThat;
using MediatR;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Audit;
Expand All @@ -22,6 +23,7 @@
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.JobManagement;
using Microsoft.Health.SqlServer.Features.Storage;
using Newtonsoft.Json;

namespace Microsoft.Health.Fhir.SqlServer.Features.Operations.Import
Expand All @@ -31,6 +33,7 @@ public class ImportProcessingJob : IJob
{
private const string CancelledErrorMessage = "Import processing job is canceled.";
internal const string DefaultCallerAgent = "Microsoft.Health.Fhir.Server";
internal const string SurrogateIdsErrorMessage = "Unable to generate internal IDs. If the lastUpdated meta element is provided as input, reduce the number of resources with the same up to millisecond lastUpdated below 10,000.";

private readonly IMediator _mediator;
private readonly IQueueClient _queueClient;
Expand Down Expand Up @@ -152,6 +155,12 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
var error = new ImportJobErrorResult() { ErrorMessage = CancelledErrorMessage };
throw new JobExecutionException(canceledEx.Message, error, canceledEx);
}
catch (SqlException ex) when (ex.Number == SqlErrorCodes.Conflict)
{
_logger.LogJobInformation(ex, jobInfo, "Exceeded retries on conflicts. Most likely reason - too many input resources with the same last updated.");
var error = new ImportJobErrorResult() { ErrorMessage = SurrogateIdsErrorMessage, HttpStatusCode = HttpStatusCode.BadRequest, ErrorDetails = ex.ToString() };
throw new JobExecutionException(ex.Message, error, ex);
}
catch (Exception ex)
{
_logger.LogJobError(ex, jobInfo, "Critical error in import processing job.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ internal async Task<IReadOnlyList<string>> ImportResourcesAsync(IReadOnlyList<Im
if (sqlEx != null && sqlEx.Number == SqlErrorCodes.Conflict && retries++ < 30)
{
_logger.LogWarning(e, $"Error on {nameof(ImportResourcesInternalAsync)} retries={{Retries}}", retries);
await Task.Delay(5000, cancellationToken);
await Task.Delay(1000, cancellationToken);
continue;
}

Expand Down Expand Up @@ -444,10 +444,10 @@ List<string> GetErrors(IEnumerable<ImportResource> dups, IEnumerable<ImportResou
}
else if (importMode == ImportMode.IncrementalLoad)
{
// dedup by last updated - keep all versions when version and lastUpdated exist on record.
// dedup by last updated - take first version for single last updated, prefer large version.
var inputsDedupped = resources
.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, ignoreVersion: !_.KeepVersion || !_.KeepLastUpdated))
.Select(_ => _.First())
.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, ignoreVersion: true))
.Select(_ => _.OrderByDescending(_ => _.ResourceWrapper.Version).First())
.ToList();

await HandleIncrementalVersionedImport(inputsDedupped, useReplicasForReads);
Expand All @@ -459,8 +459,8 @@ List<string> GetErrors(IEnumerable<ImportResource> dups, IEnumerable<ImportResou

async Task HandleIncrementalVersionedImport(List<ImportResource> inputs, bool useReplicasForReads)
{
// Dedup by version via ToResourceKey - only keep first occurance of a version in this batch.
var inputsWithVersion = inputs.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();
// Dedup by version via ToResourceKey - prefer latest dates.
var inputsWithVersion = inputs.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.OrderByDescending(_ => _.ResourceWrapper.LastModified.DateTime).First()).ToList();

// Search the db for versions that match the import resources with version so we can filter duplicates from the import.
var currentInDb = (await GetAsync(inputsWithVersion.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(), _ => _);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -21,6 +22,7 @@
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Operations.Import.Models;
using Microsoft.Health.Fhir.SqlServer.Features.Operations.Import;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Fhir.Tests.E2E.Common;
Expand Down Expand Up @@ -161,6 +163,25 @@ private object ExecuteSql(string sql)
return (checkLocation, id);
}

[Fact]
public async Task GivenIncrementalLoad_80KSurrogateIds_BadRequestIsReturned()
{
var ndJson = new StringBuilder();
for (int i = 0; i < 80001; i++)
{
var id = Guid.NewGuid().ToString("N");
var str = CreateTestPatient(id, DateTimeOffset.Parse("1900-01-01Z00:00")); // make sure this date is not used by other tests.
ndJson.Append(str);
}

var location = (await ImportTestHelper.UploadFileAsync(ndJson.ToString(), _fixture.StorageAccount)).location;
var request = CreateImportRequest(location, ImportMode.IncrementalLoad);
var checkLocation = await ImportTestHelper.CreateImportTaskAsync(_client, request);
var message = await ImportWaitAsync(checkLocation, false);
Assert.Equal(HttpStatusCode.BadRequest, message.StatusCode);
Assert.Contains(ImportProcessingJob.SurrogateIdsErrorMessage, await message.Content.ReadAsStringAsync());
}

[Fact]
public async Task GivenIncrementalLoad_MultipleInputVersionsOutOfOrderSomeNotExplicit_ResourceNotExisting_NoGap()
{
Expand Down Expand Up @@ -262,21 +283,76 @@ public async Task GivenIncrementalLoad_MultipleNonSequentialInputVersions_Resour
}

[Fact]
public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceExisting_VersionConflict()
public async Task GivenIncrementalLoad_SameLastUpdated_DifferentVersions_ResourceExisting()
{
var id = Guid.NewGuid().ToString("N");

var ndJson2 = PrepareResource(id, "2", "2002");
var location = (await ImportTestHelper.UploadFileAsync(ndJson2, _fixture.StorageAccount)).location;
var request = CreateImportRequest(location, ImportMode.IncrementalLoad);
await ImportCheckAsync(request, null);

var result = await _client.ReadAsync<Patient>(ResourceType.Patient, id);
Assert.Equal("2", result.Resource.Meta.VersionId);
Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated);

// same date but different versions
var ndJson1 = PrepareResource(id, "1", "2003");
var ndJson3 = PrepareResource(id, "3", "2003");
var location2 = (await ImportTestHelper.UploadFileAsync(ndJson1 + ndJson3, _fixture.StorageAccount)).location;
var request2 = CreateImportRequest(location2, ImportMode.IncrementalLoad);
await ImportCheckAsync(request2, null, 1);

result = await _client.ReadAsync<Patient>(ResourceType.Patient, id);
Assert.Equal("3", result.Resource.Meta.VersionId);
Assert.Equal(GetLastUpdated("2003"), result.Resource.Meta.LastUpdated);

result = await _client.VReadAsync<Patient>(ResourceType.Patient, id, "2");
Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated);
}

[Fact]
public async Task GivenIncrementalLoad_SameVersion_DifferentLastUpdated_ResourceExisting()
{
var id = Guid.NewGuid().ToString("N");

var ndJson2 = PrepareResource(id, "2", "2002");
var location = (await ImportTestHelper.UploadFileAsync(ndJson2, _fixture.StorageAccount)).location;
var request = CreateImportRequest(location, ImportMode.IncrementalLoad);
await ImportCheckAsync(request, null);

var result = await _client.ReadAsync<Patient>(ResourceType.Patient, id);
Assert.Equal("2", result.Resource.Meta.VersionId);

// same version but different dates
var ndJson1 = PrepareResource(id, "3", "2001");
var ndJson3 = PrepareResource(id, "3", "2003");
var location2 = (await ImportTestHelper.UploadFileAsync(ndJson1 + ndJson3, _fixture.StorageAccount)).location;
var request2 = CreateImportRequest(location2, ImportMode.IncrementalLoad);
await ImportCheckAsync(request2, null, 1);

result = await _client.ReadAsync<Patient>(ResourceType.Patient, id);
Assert.Equal("3", result.Resource.Meta.VersionId);
Assert.Equal(GetLastUpdated("2003"), result.Resource.Meta.LastUpdated);
result = await _client.VReadAsync<Patient>(ResourceType.Patient, id, "2");
Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated);
}

[Fact]
public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceExisting()
{
var id = Guid.NewGuid().ToString("N");

// set existing
var ndJson2 = PrepareResource(id, "2", "2002");
(Uri location, string _) = await ImportTestHelper.UploadFileAsync(ndJson2, _fixture.StorageAccount);
var location = (await ImportTestHelper.UploadFileAsync(ndJson2, _fixture.StorageAccount)).location;
var request = CreateImportRequest(location, ImportMode.IncrementalLoad);
await ImportCheckAsync(request, null);

// set input
var ndJson = PrepareResource(id, "1", "2001");
//// keep ndJson2 as is
var ndJson3 = PrepareResource(id, "3", "2003");
(Uri location2, string _) = await ImportTestHelper.UploadFileAsync(ndJson + ndJson2 + ndJson3, _fixture.StorageAccount);
var location2 = (await ImportTestHelper.UploadFileAsync(ndJson + ndJson2 + ndJson3, _fixture.StorageAccount)).location;
var request2 = CreateImportRequest(location2, ImportMode.IncrementalLoad);
await ImportCheckAsync(request2, null, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
using MediatR;
using Microsoft.Data.SqlClient;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Abstractions.Features.Transactions;
using Microsoft.Health.Fhir.Core;
Expand Down Expand Up @@ -68,6 +69,50 @@ public FhirStorageTests(FhirStorageTestsFixture fixture)

protected Mediator Mediator { get; }

[Theory]
[InlineData(5)] // should succeed
[InlineData(35)] // shoul fail
[FhirStorageTestsFixtureArgumentSets(DataStore.SqlServer)]
public async Task RetriesOnConflict(int requestedExceptions)
{
try
{
await _fixture.SqlHelper.ExecuteSqlCmd("TRUNCATE TABLE EventLog");
await _fixture.SqlHelper.ExecuteSqlCmd(@$"
CREATE TRIGGER Resource_Trigger ON Resource FOR INSERT
AS
IF (SELECT count(*) FROM EventLog WHERE Process = 'MergeResources' AND Status = 'Error') < {requestedExceptions}
INSERT INTO Resource SELECT * FROM inserted -- this will cause dup key exception which is treated as a conflict
");

var patient = (Patient)Samples.GetJsonSample("Patient").ToPoco();
patient.Id = Guid.NewGuid().ToString();
try
{
await Mediator.UpsertResourceAsync(patient.ToResourceElement());
if (requestedExceptions > 30)
{
Assert.Fail("This point should not be reached");
}
}
catch (SqlException e)
{
if (requestedExceptions > 30)
{
Assert.Contains("Resource has been recently updated or added", e.Message);
}
else
{
throw;
}
}
}
finally
{
await _fixture.SqlHelper.ExecuteSqlCmd("IF object_id('Resource_Trigger') IS NOT NULL DROP TRIGGER Resource_Trigger");
}
}

[Fact]
[FhirStorageTestsFixtureArgumentSets(DataStore.SqlServer)]
public async Task TimeTravel()
Expand Down
Loading