Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent reporting dups on worker restart #3771

Merged
merged 4 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,10 @@ public async Task<ImportProcessingProgress> Import(Channel<ImportResource> input
errors.AddRange(resources.Where(r => !string.IsNullOrEmpty(r.ImportError)).Select(r => r.ImportError));
//// exclude resources with parsing error (ImportError != null)
var validResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
var results = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken);
var dups = validResources.Except(results.Loaded).Except(results.Conflicts);
AppendErrorsToBuffer(dups, results.Conflicts, errors);
var newErrors = await _store.ImportResourcesAsync(validResources, importMode, cancellationToken);
errors.AddRange(newErrors);
resources.Clear();
return (results.Loaded.Count, resources.Sum(_ => (long)_.Length));
}

private void AppendErrorsToBuffer(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts, List<string> importErrorBuffer)
{
foreach (var resource in dups)
{
importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

foreach (var resource in conflicts)
{
importErrorBuffer.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}
return (validResources.Count - newErrors.Count, resources.Sum(_ => (long)_.Length));
}

private async Task<ImportProcessingProgress> UploadImportErrorsAsync(IImportErrorStore importErrorStore, long succeededCount, long failedCount, string[] importErrors, long lastIndex, long processedBytes, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
private readonly ILogger<SqlServerFhirDataStore> _logger;
private readonly SchemaInformation _schemaInformation;
private readonly IModelInfoProvider _modelInfoProvider;
private readonly IImportErrorSerializer _importErrorSerializer;
private static IgnoreInputLastUpdated _ignoreInputLastUpdated;
private static RawResourceDeduping _rawResourceDeduping;
private static object _flagLocker = new object();
Expand All @@ -73,7 +74,8 @@
ILogger<SqlServerFhirDataStore> logger,
SchemaInformation schemaInformation,
IModelInfoProvider modelInfoProvider,
RequestContextAccessor<IFhirRequestContext> requestContextAccessor)
RequestContextAccessor<IFhirRequestContext> requestContextAccessor,
IImportErrorSerializer importErrorSerializer)
{
_model = EnsureArg.IsNotNull(model, nameof(model));
_searchParameterTypeMap = EnsureArg.IsNotNull(searchParameterTypeMap, nameof(searchParameterTypeMap));
Expand All @@ -87,6 +89,7 @@
_schemaInformation = EnsureArg.IsNotNull(schemaInformation, nameof(schemaInformation));
_modelInfoProvider = EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider));
_requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));
_importErrorSerializer = EnsureArg.IsNotNull(importErrorSerializer, nameof(importErrorSerializer));

_memoryStreamManager = new RecyclableMemoryStreamManager();

Expand Down Expand Up @@ -141,7 +144,7 @@
}

_logger.LogError(e, $"Error from SQL database on {nameof(MergeAsync)} retries={{Retries}}", retries);
await _sqlRetryService.TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={sqlEx}", null, cancellationToken);
await _sqlRetryService.TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={trueEx}", null, cancellationToken);

throw trueEx;
}
Expand Down Expand Up @@ -375,7 +378,7 @@
return results;
}

internal async Task<(IReadOnlyList<ImportResource> Loaded, IReadOnlyList<ImportResource> Conflicts)> ImportResourcesAsync(IReadOnlyList<ImportResource> resources, ImportMode importMode, CancellationToken cancellationToken)
internal async Task<IReadOnlyList<string>> ImportResourcesAsync(IReadOnlyList<ImportResource> resources, ImportMode importMode, CancellationToken cancellationToken)
{
(List<ImportResource> Loaded, List<ImportResource> Conflicts) results;
var retries = 0;
Expand Down Expand Up @@ -403,7 +406,25 @@
}
}

return (results.Loaded, results.Conflicts);
var dups = resources.Except(results.Loaded).Except(results.Conflicts);

return GetErrors(dups, results.Conflicts);

List<string> GetErrors(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts)
{
var errors = new List<string>();
foreach (var resource in dups)
{
errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportDuplicate, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

foreach (var resource in conflicts)
{
errors.Add(_importErrorSerializer.Serialize(resource.Index, string.Format(Resources.FailedToImportConflictingVersion, resource.ResourceWrapper.ResourceId, resource.Index), resource.Offset));
}

return errors;
}

async Task<(List<ImportResource> Loaded, List<ImportResource> Conflicts)> ImportResourcesInternalAsync(bool useReplicasForReads)
{
Expand Down Expand Up @@ -431,40 +452,62 @@

return (loaded, conflicts);

async Task HandleIncrementalVersionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
async Task HandleIncrementalVersionedImport(List<ImportResource> input, bool useReplicasForReads)
{
// Dedup by version via ToResourceKey - only keep first occurance of a version in this batch.
var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();
var inputWithVersion = input.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();

// Search the db for versions that match the import resources with version so we can filter duplicates from the import.
var currentResourceKeysInDb = new HashSet<ResourceKey>((await GetAsync(inputDeduppedWithVersions.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).Select(_ => _.ToResourceKey()));
var currentInDb = (await GetAsync(inputWithVersion.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(), _ => _);

// If resources are identical consider already loaded. We should compare both last updated and raw resource
// if dates or raw resource do not match consider as conflict
var toBeLoaded = new List<ImportResource>();
foreach (var inputRes in inputWithVersion)
{
if (currentInDb.TryGetValue(inputRes.ResourceWrapper.ToResourceKey(), out var inDb))
{
if (inDb.LastModified == inputRes.ResourceWrapper.LastModified && inDb.RawResource.Data == inputRes.ResourceWrapper.RawResource.Data)
{
loaded.Add(inputRes); // exact match
}
else
{
conflicts.Add(inputRes); // version match but diff dates or raw resources
}
}
else
{
toBeLoaded.Add(inputRes);
}
}

// Import resource versions that don't exist in the db. Sorting is used in merge to set isHistory - don't change it without updating that method!
loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentResourceKeysInDb.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified));
await MergeResourcesWithLastUpdatedAsync(loaded, useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(toBeLoaded.OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified), useReplicasForReads);
loaded.AddRange(toBeLoaded);
}

async Task HandleIncrementalUnversionedImport(List<ImportResource> inputDedupped, bool useReplicasForReads)
async Task HandleIncrementalUnversionedImport(List<ImportResource> input, bool useReplicasForReads)
{
// Dedup by resource id - only keep first occurance of an unversioned resource. This method is run in many parallel workers - we cannot guarantee processing order across parallel file streams. Taking the first resource avoids conflicts.
var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList();
var inputNoVersion = input.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).Select(_ => _.First()).ToList();

// Ensure that the imported resources can "fit" between existing versions in the db. We want to keep versionId sequential along with lastUpdated.
// First part is setup.
var currentDates = (await GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputDeduppedNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputDeduppedNoVersion)
var currentDates = (await GetAsync(inputNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(ignoreVersion: true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(ignoreVersion: true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputNoVersion)
{
if (currentDates.TryGetValue(resource.ResourceWrapper.ToResourceKey(ignoreVersion: true), out var dateKey)
&& ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.ResourceWrapper.LastModified.DateTime) < dateKey.ResourceSurrogateId)
{
inputDeduppedNoVersionForCheck.Add(resource);
inputNoVersionForCheck.Add(resource);
}
}

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.

// Second part is testing if the imported resources can "fit" between existing versions in the db.
var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputDeduppedNoVersionForCheck)
var versionSlots = (await StoreClient.GetResourceVersionsAsync(inputNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputNoVersionForCheck)
{
var resourceKey = resource.ResourceWrapper.ToResourceKey(ignoreVersion: true);
versionSlots.TryGetValue(resourceKey, out var versionSlotKey);
Expand All @@ -481,10 +524,10 @@
}

// Finally merge the resources to the db.
var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads);
loaded.AddRange(inputDeduppedNoVersionNoConflict);
var inputNoVersionNoConflict = inputNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => _.KeepVersion), useReplicasForReads);
await MergeResourcesWithLastUpdatedAsync(inputNoVersionNoConflict.Where(_ => !_.KeepVersion), useReplicasForReads);
loaded.AddRange(inputNoVersionNoConflict);
}
}

Expand Down
Loading
Loading