Skip to content

Commit

Permalink
Prevent reporting dups on worker restart (#3771)
Browse files Browse the repository at this point in the history
* Prevent reporting dups on worker restart

* internal

* skip
  • Loading branch information
SergeyGaluzo authored Mar 25, 2024
1 parent f614d05 commit 49a0379
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,10 @@ public async Task<ImportProcessingProgress> Import(Channel<ImportResource> input
errors.AddRange(resources.Where(r => !string.IsNullOrEmpty(r.ImportError)).Select(r => r.ImportError));
//// exclude resources with parsing error (ImportError != null)
var validResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
var results = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken);
var dups = validResources.Except(results.Loaded).Except(results.Conflicts);
AppendErrorsToBuffer(dups, results.Conflicts, errors);
var newErrors = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken);
errors.AddRange(newErrors);
resources.Clear();
return (results.Loaded.Count, resources.Sum(_ => (long)_.Length));
}

private void AppendErrorsToBuffer(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts, List<string> importErrorBuffer)
{
foreach (var resource in dups)
{
importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

foreach (var resource in conflicts)
{
importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}
return (validResources.Count - newErrors.Count, resources.Sum(_ => (long)_.Length));
}

private async Task<ImportProcessingProgress> UploadImportErrorsAsync(IImportErrorStore importErrorStore, long succeededCount, long failedCount, string[] importErrors, long lastIndex, long processedBytes, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability
private readonly ILogger<SqlServerFhirDataStore> _logger;
private readonly SchemaInformation _schemaInformation;
private readonly IModelInfoProvider _modelInfoProvider;
private readonly IImportErrorSerializer _importErrorSerializer;
private static IgnoreInputLastUpdated _ignoreInputLastUpdated;
private static RawResourceDeduping _rawResourceDeduping;
private static object _flagLocker = new object();
Expand All @@ -73,7 +74,8 @@ public SqlServerFhirDataStore(
ILogger<SqlServerFhirDataStore> logger,
SchemaInformation schemaInformation,
IModelInfoProvider modelInfoProvider,
RequestContextAccessor<IFhirRequestContext> requestContextAccessor)
RequestContextAccessor<IFhirRequestContext> requestContextAccessor,
IImportErrorSerializer importErrorSerializer)
{
_model = EnsureArg.IsNotNull(model, nameof(model));
_searchParameterTypeMap = EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
Expand All @@ -87,6 +89,7 @@ public SqlServerFhirDataStore(
_schemaInformation = EnsureArg.IsNotNull(schemaInformation, nameof(schemaInformation));
_modelInfoProvider = EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider));
_requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));
_importErrorSerializer = EnsureArg.IsNotNull(importErrorSerializer, nameof(importErrorSerializer));

_memoryStreamManager = new RecyclableMemoryStreamManager();

Expand Down Expand Up @@ -141,7 +144,7 @@ public async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOu
}

_logger.LogError(e, $"Error from SQL database on {nameof(MergeAsync)} retries={{Retries}}", retries);
await _sqlRetryService.TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={sqlEx}", null, cancellationToken);
await _sqlRetryService.TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={trueEx}", null, cancellationToken);

throw trueEx;
}
Expand Down Expand Up @@ -375,7 +378,7 @@ private async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationO
return results;
}

internal async Task<(IReadOnlyList<ImportResource> Loaded, IReadOnlyList<ImportResource> Conflicts)> ImportResourcesAsync(IReadOnlyList<ImportResource> resources, ImportMode importMode, CancellationToken cancellationToken)
internal async Task<IReadOnlyList<string>> ImportResourcesAsync(IReadOnlyList<ImportResource> resources, ImportMode importMode, CancellationToken cancellationToken)
{
(List<ImportResource> Loaded, List<ImportResource> Conflicts) results;
var retries = 0;
Expand Down Expand Up @@ -403,68 +406,108 @@ private async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationO
}
}

return (results.Loaded, results.Conflicts);
var dups = resources.Except(results.Loaded).Except(results.Conflicts);

return GetErrors(dups, results.Conflicts);

List<string> GetErrors(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts)
{
var errors = new List<string>();
foreach (var resource in dups)
{
errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

foreach (var resource in conflicts)
{
errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

return errors;
}

async Task<(List<ImportResource> Loaded, List<ImportResource> Conflicts)> ImportResourcesInternalAsync(bool useReplicasForReads)
{
var loaded = new List<ImportResource>();
var conflicts = new List<ImportResource>();
if (importMode == ImportMode.InitialLoad)
{
var inputDedupped = resources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList();
var current = new HashSet<ResourceKey>((await GetAsync(inputDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true)));
loaded.AddRange(inputDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _)));
var inputsDedupped = resources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList();
var current = new HashSet<ResourceKey>((await GetAsync(inputsDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true)));
loaded.AddRange(inputsDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _)));
await MergeResourcesWithLastUpdatedAsync(loaded, useReplicasForReads);
}
else if (importMode == ImportMode.IncrementalLoad)
{
// dedup by last updated - keep all versions when version and lastUpdated exist on record.
var inputDedupped = resources
var inputsDedupped = resources
.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, ignoreVersion: !_.KeepVersion || !_.KeepLastUpdated))
.Select(_ => _.First())
.ToList();

await HandleIncrementalVersionedImport(inputDedupped, useReplicasForReads);
await HandleIncrementalVersionedImport(inputsDedupped, useReplicasForReads);

await HandleIncrementalUnversionedImport(inputDedupped, useReplicasForReads);
await HandleIncrementalUnversionedImport(inputsDedupped, useReplicasForReads);
}

return (loaded, conflicts);

async Task HandleIncrementalVersionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
async Task HandleIncrementalVersionedImport(List<ImportResource> inputs, bool useReplicasForReads)
{
// Dedup by version via ToResourceKey - only keep first occurance of a version in this batch.
var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();
var inputsWithVersion = inputs.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();

// Search the db for versions that match the import resources with version so we can filter duplicates from the import.
var currentResourceKeysInDb = new HashSet<ResourceKey>((await GetAsync(inputDeduppedWithVersions.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).Select(_ => _.ToResourceKey()));
var currentInDb = (await GetAsync(inputsWithVersion.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(), _ => _);

// If resources are identical consider already loaded. We should compare both last updated and raw resource
// if dates or raw resource do not match consider as conflict
var toBeLoaded = new List<ImportResource>();
foreach (var resource in inputsWithVersion)
{
if (currentInDb.TryGetValue(resource.ResourceWrapper.ToResourceKey(), out var inDb))
{
if (inDb.LastModified == resource.ResourceWrapper.LastModified && inDb.RawResource.Data == resource.ResourceWrapper.RawResource.Data)
{
loaded.Add(resource); // exact match
}
else
{
conflicts.Add(resource); // version match but diff dates or raw resources
}
}
else
{
toBeLoaded.Add(resource);
}
}

// Import resource versions that don't exist in the db. Sorting is used in merge to set isHistory - don't change it without updating that method!
loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentResourceKeysInDb.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified));
await MergeResourcesWithLastUpdatedAsync(loaded, useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(toBeLoaded.OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified), useReplicasForReads);
loaded.AddRange(toBeLoaded);
}

async Task HandleIncrementalUnversionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
async Task HandleIncrementalUnversionedImport(List<ImportResource> inputs, bool useReplicasForReads)
{
// Dedup by resource id - only keep first occurance of an unversioned resource. This method is run in many parallel workers - we cannot guarantee processing order across parallel file streams. Taking the first resource avoids conflicts.
var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList();
var inputsNoVersion = inputs.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList();

// Ensure that the imported resources can "fit" between existing versions in the db. We want to keep versionId sequential along with lastUpdated.
// First part is setup.
var currentDates = (await GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputDeduppedNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputDeduppedNoVersion)
var currentDates = (await GetAsync(inputsNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputsNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputsNoVersion)
{
if (currentDates.TryGetValue(resource.ResourceWrapper.ToResourceKey(ignoreVersion: true), out var dateKey)
&& ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.ResourceWrapper.LastModified.DateTime) < dateKey.ResourceSurrogateId)
{
inputDeduppedNoVersionForCheck.Add(resource);
inputsNoVersionForCheck.Add(resource);
}
}

// Second part is testing if the imported resources can "fit" between existing versions in the db.
var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputDeduppedNoVersionForCheck)
var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputsNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputsNoVersionForCheck)
{
var resourceKey = resource.ResourceWrapper.ToResourceKey(ignoreVersion: true);
versionSlots.TryGetValue(resourceKey, out var versionSlotKey);
Expand All @@ -481,10 +524,10 @@ async Task HandleIncrementalUnversionedImport(List<ImportResource> inputDedupped
}

// Finally merge the resources to the db.
var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads);
loaded.AddRange(inputDeduppedNoVersionNoConflict);
var inputNoVersionNoConflict = inputsNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads);
loaded.AddRange(inputNoVersionNoConflict);
}
}

Expand Down
Loading

0 comments on commit 49a0379

Please sign in to comment.