diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs index 48fd1b896a..c61d065599 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs @@ -91,24 +91,10 @@ public async Task Import(Channel input errors.AddRange(resources.Where(r => !string.IsNullOrEmpty(r.ImportError)).Select(r => r.ImportError)); //// exclude resources with parsing error (ImportError != null) var validResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList(); - var results = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken); - var dups = validResources.Except(results.Loaded).Except(results.Conflicts); - AppendErrorsToBuffer(dups, results.Conflicts, errors); + var newErrors = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken); + errors.AddRange(newErrors); resources.Clear(); - return (results.Loaded.Count, resources.Sum(_ => (long)_.Length)); - } - - private void AppendErrorsToBuffer(IEnumerable dups, IEnumerable conflicts, List importErrorBuffer) - { - foreach (var resource in dups) - { - importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset)); - } - - foreach (var resource in conflicts) - { - importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset)); - } + return (validResources.Count - newErrors.Count, resources.Sum(_ => (long)_.Length)); } private async Task UploadImportErrorsAsync(IImportErrorStore importErrorStore, long succeededCount, long failedCount, string[] importErrors, long lastIndex, long processedBytes, CancellationToken cancellationToken) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index 26783c8529..14fb633cb7 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -58,6 +58,7 @@ internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability private readonly ILogger _logger; private readonly SchemaInformation _schemaInformation; private readonly IModelInfoProvider _modelInfoProvider; + private readonly IImportErrorSerializer _importErrorSerializer; private static IgnoreInputLastUpdated _ignoreInputLastUpdated; private static RawResourceDeduping _rawResourceDeduping; private static object _flagLocker = new object(); @@ -73,7 +74,8 @@ public SqlServerFhirDataStore( ILogger logger, SchemaInformation schemaInformation, IModelInfoProvider modelInfoProvider, - RequestContextAccessor requestContextAccessor) + RequestContextAccessor requestContextAccessor, + IImportErrorSerializer importErrorSerializer) { _model = EnsureArg.IsNotNull(model, nameof(model)); _searchParameterTypeMap = EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap)); @@ -87,6 +89,7 @@ public SqlServerFhirDataStore( _schemaInformation = EnsureArg.IsNotNull(schemaInformation, nameof(schemaInformation)); _modelInfoProvider = EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider)); _requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor)); + _importErrorSerializer = EnsureArg.IsNotNull(importErrorSerializer, nameof(importErrorSerializer)); _memoryStreamManager = new RecyclableMemoryStreamManager(); @@ -141,7 +144,7 @@ public async Task Loaded, IReadOnlyList Conflicts)> ImportResourcesAsync(IReadOnlyList resources, ImportMode importMode, CancellationToken cancellationToken) + internal async Task> ImportResourcesAsync(IReadOnlyList resources, ImportMode importMode, CancellationToken cancellationToken) { (List Loaded, List Conflicts) results; var retries = 0; @@ -403,7 +406,25 @@ private async Task GetErrors(IEnumerable dups, IEnumerable conflicts) + { + var errors = new List(); + foreach (var resource in dups) + { + errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset)); + } + + foreach (var resource in conflicts) + { + errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset)); + } + + return errors; + } async Task<(List Loaded, List Conflicts)> ImportResourcesInternalAsync(bool useReplicasForReads) { @@ -411,60 +432,82 @@ private async Task(); if (importMode == ImportMode.InitialLoad) { - var inputDedupped = resources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList(); - var current = new HashSet((await GetAsync(inputDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true))); - loaded.AddRange(inputDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _))); + var inputsDedupped = resources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList(); + var current = new HashSet((await GetAsync(inputsDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true))); + loaded.AddRange(inputsDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _))); await MergeResourcesWithLastUpdatedAsync(loaded, useReplicasForReads); } else if (importMode == ImportMode.IncrementalLoad) { // dedup by last updated - keep all versions when version and lastUpdated exist on record. - var inputDedupped = resources + var inputsDedupped = resources .GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, ignoreVersion: !_.KeepVersion || !_.KeepLastUpdated)) .Select(_ => _.First()) .ToList(); - await HandleIncrementalVersionedImport(inputDedupped, useReplicasForReads); + await HandleIncrementalVersionedImport(inputsDedupped, useReplicasForReads); - await HandleIncrementalUnversionedImport(inputDedupped, useReplicasForReads); + await HandleIncrementalUnversionedImport(inputsDedupped, useReplicasForReads); } return (loaded, conflicts); - async Task HandleIncrementalVersionedImport(List inputDedupped, bool useReplicasForReads) + async Task HandleIncrementalVersionedImport(List inputs, bool useReplicasForReads) { // Dedup by version via ToResourceKey - only keep first occurance of a version in this batch. - var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList(); + var inputsWithVersion = inputs.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList(); // Search the db for versions that match the import resources with version so we can filter duplicates from the import. - var currentResourceKeysInDb = new HashSet((await GetAsync(inputDeduppedWithVersions.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).Select(_ => _.ToResourceKey())); + var currentInDb = (await GetAsync(inputsWithVersion.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(), _ => _); + + // If resources are identical consider already loaded. We should compare both last updated and raw resource + // if dates or raw resource do not match consider as conflict + var toBeLoaded = new List(); + foreach (var resource in inputsWithVersion) + { + if (currentInDb.TryGetValue(resource.ResourceWrapper.ToResourceKey(), out var inDb)) + { + if (inDb.LastModified == resource.ResourceWrapper.LastModified && inDb.RawResource.Data == resource.ResourceWrapper.RawResource.Data) + { + loaded.Add(resource); // exact match + } + else + { + conflicts.Add(resource); // version match but diff dates or raw resources + } + } + else + { + toBeLoaded.Add(resource); + } + } // Import resource versions that don't exist in the db. Sorting is used in merge to set isHistory - don't change it without updating that method! - loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentResourceKeysInDb.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified)); - await MergeResourcesWithLastUpdatedAsync(loaded, useReplicasForReads); + await MergeResourcesWithLastUpdatedAsync(toBeLoaded.OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified), useReplicasForReads); + loaded.AddRange(toBeLoaded); } - async Task HandleIncrementalUnversionedImport(List inputDedupped, bool useReplicasForReads) + async Task HandleIncrementalUnversionedImport(List inputs, bool useReplicasForReads) { // Dedup by resource id - only keep first occurance of an unversioned resource. This method is run in many parallel workers - we cannot guarantee processing order across parallel file streams. Taking the first resource avoids conflicts. - var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList(); + var inputsNoVersion = inputs.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList(); // Ensure that the imported resources can "fit" between existing versions in the db. We want to keep versionId sequential along with lastUpdated. // First part is setup. - var currentDates = (await GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId)); - var inputDeduppedNoVersionForCheck = new List(); - foreach (var resource in inputDeduppedNoVersion) + var currentDates = (await GetAsync(inputsNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId)); + var inputsNoVersionForCheck = new List(); + foreach (var resource in inputsNoVersion) { if (currentDates.TryGetValue(resource.ResourceWrapper.ToResourceKey(ignoreVersion: true), out var dateKey) && ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.ResourceWrapper.LastModified.DateTime) < dateKey.ResourceSurrogateId) { - inputDeduppedNoVersionForCheck.Add(resource); + inputsNoVersionForCheck.Add(resource); } } // Second part is testing if the imported resources can "fit" between existing versions in the db. - var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _); - foreach (var resource in inputDeduppedNoVersionForCheck) + var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputsNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _); + foreach (var resource in inputsNoVersionForCheck) { var resourceKey = resource.ResourceWrapper.ToResourceKey(ignoreVersion: true); versionSlots.TryGetValue(resourceKey, out var versionSlotKey); @@ -481,10 +524,10 @@ async Task HandleIncrementalUnversionedImport(List inputDedupped } // Finally merge the resources to the db. - var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned - await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads); - await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads); - loaded.AddRange(inputDeduppedNoVersionNoConflict); + var inputNoVersionNoConflict = inputsNoVersion.Except(conflicts).ToList(); // some resources might get version assigned + await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads); + await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads); + loaded.AddRange(inputNoVersionNoConflict); } } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs index 16296e8678..cadd4be422 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/Import/ImportTests.cs @@ -12,6 +12,7 @@ using System.Threading; using System.Threading.Tasks; using Hl7.Fhir.Model; +using Hl7.Fhir.Serialization; using IdentityServer4.Models; using MediatR; using Microsoft.Health.Fhir.Api.Features.Operations.Import; @@ -40,6 +41,7 @@ public class ImportTests : IClassFixture _fixture; + private static readonly FhirJsonSerializer _fhirJsonSerializer = new FhirJsonSerializer(); public ImportTests(ImportTestFixture fixture) { @@ -165,7 +167,7 @@ public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceExisting_Ve var ndJson3 = PrepareResource(id, "3", "2003"); (Uri location2, string _) = await ImportTestHelper.UploadFileAsync(ndJson + ndJson2 + ndJson3, _fixture.StorageAccount); var request2 = CreateImportRequest(location2, ImportMode.IncrementalLoad); - await ImportCheckAsync(request2, null, 1); + await ImportCheckAsync(request2, null, 0); // check current var result = await _client.ReadAsync(ResourceType.Patient, id); @@ -332,6 +334,58 @@ public async Task GivenIncrementalLoad_WhenOutOfOrder_ThenCurrentDatabaseVersion Assert.Equal(GetLastUpdated("2001"), result.Resource.Meta.LastUpdated); // version 1 imported } + [Fact] + public async Task GivenIncrementalLoad_SameLastUpdated_SameVersion_DifferentContent_ShouldProduceConflict() + { + var id = Guid.NewGuid().ToString("N"); + var ndJson = CreateTestPatient(id, DateTimeOffset.Parse("2021-01-01Z00:00"), "2"); + + var location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + var request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 0); + + ndJson = CreateTestPatient(id, DateTimeOffset.Parse("2021-01-01Z00:00"), "2", "2000"); + location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 1); + + Assert.Single((await _client.SearchAsync($"Patient/{id}/_history")).Resource.Entry); + } + + [Fact] + public async Task GivenIncrementalLoad_SameLastUpdated_SameVersion_Run2Times_ShouldProduceSameResult() + { + var id = Guid.NewGuid().ToString("N"); + var ndJson = CreateTestPatient(id, DateTimeOffset.Parse("2021-01-01Z00:00"), "2"); + + var location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + var request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 0); + + location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 0); + + Assert.Single((await _client.SearchAsync($"Patient/{id}/_history")).Resource.Entry); + } + + [Fact] + public async Task GivenIncrementalLoad_SameLastUpdated_Run2Times_ShouldProduceSameResult() + { + var id = Guid.NewGuid().ToString("N"); + var ndJson = CreateTestPatient(id, DateTimeOffset.Parse("2021-01-01Z00:00")); + + var location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + var request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 0); + + location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location; + request = CreateImportRequest(location, ImportMode.IncrementalLoad); + await ImportCheckAsync(request, null, 0); + + Assert.Single((await _client.SearchAsync($"Patient/{id}/_history")).Resource.Entry); + } + [Fact] public async Task GivenIncrementalLoad_ThenInputLastUpdatedAndVersionShouldBeKept() { @@ -982,7 +1036,7 @@ private async Task ImportCheckAsync(ImportRequest request, TestFhirClient c Assert.Equal(System.Net.HttpStatusCode.OK, response.StatusCode); ImportJobResult result = JsonConvert.DeserializeObject(await response.Content.ReadAsStringAsync()); Assert.NotEmpty(result.Output); - if (errorCount != null) + if (errorCount != null && errorCount != 0) { Assert.Equal(errorCount.Value, result.Error.First().Count); } @@ -1007,5 +1061,36 @@ private async Task ImportWaitAsync(Uri checkLocation, TestF return response; } + + private string CreateTestPatient(string id = null, DateTimeOffset? lastUpdated = null, string versionId = null, string birhDate = null, bool deleted = false) + { + var rtn = new Patient() + { + Id = id ?? Guid.NewGuid().ToString("N"), + Meta = new(), + }; + + if (lastUpdated is not null) + { + rtn.Meta = new Meta { LastUpdated = lastUpdated }; + } + + if (versionId is not null) + { + rtn.Meta.VersionId = versionId; + } + + if (birhDate != null) + { + rtn.BirthDate = birhDate; + } + + if (deleted) + { + rtn.Meta.Extension = new List { { new Extension(Core.Models.KnownFhirPaths.AzureSoftDeletedExtensionUrl, new FhirString("soft-deleted")) } }; + } + + return _fhirJsonSerializer.SerializeToString(rtn) + Environment.NewLine; + } } } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs index 42125d8e81..b9f0ba535c 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs @@ -175,6 +175,8 @@ internal SqlServerFhirStorageTestsFixture(int maximumSupportedSchemaVersion, str SqlRetryService = new SqlRetryService(SqlConnectionBuilder, SqlServerDataStoreConfiguration, Options.Create(new SqlRetryServiceOptions()), new SqlRetryServiceDelegateOptions()); + var importErrorSerializer = new Shared.Core.Features.Operations.Import.ImportErrorSerializer(new Hl7.Fhir.Serialization.FhirJsonSerializer()); + _fhirDataStore = new SqlServerFhirDataStore( sqlServerFhirModel, searchParameterToSearchValueTypeMap, @@ -186,7 +188,8 @@ internal SqlServerFhirStorageTestsFixture(int maximumSupportedSchemaVersion, str NullLogger.Instance, SchemaInformation, ModelInfoProvider.Instance, - _fhirRequestContextAccessor); + _fhirRequestContextAccessor, + importErrorSerializer); // the test queue client may not be enough for these tests. will need to look back into this. var queueClient = new TestQueueClient();