Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Fixes to pause/resume around enumeration #39486

Merged
merged 1 commit into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -122,6 +123,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -130,7 +132,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -154,8 +156,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -167,17 +171,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

ServiceToServiceJobPart part;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -120,6 +121,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -128,7 +130,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -152,8 +154,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -165,17 +169,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

StreamToUriJobPart part;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,13 @@ await _checkpointer.SetJobTransferStatusAsync(
status: _dataTransfer.TransferStatus).ConfigureAwait(false);
}

internal async Task OnEnumerationComplete()
/// <summary>
/// Called when enumeration is complete whether it finished successfully, failed, or was paused.
/// All resources may or may not have been enumerated.
/// </summary>
protected async Task OnEnumerationComplete()
{
_enumerationComplete = true;
await _checkpointer.OnEnumerationCompleteAsync(_dataTransfer.Id).ConfigureAwait(false);

// If there were no job parts enumerated and we haven't already aborted/completed the job.
if (_jobParts.Count == 0 &&
Expand All @@ -426,6 +429,14 @@ internal async Task OnEnumerationComplete()
await CheckAndUpdateStatusAsync().ConfigureAwait(false);
}

/// <summary>
/// Called when all resources have been enumerated successfully.
/// </summary>
protected async Task OnAllResourcesEnumerated()
{
await _checkpointer.OnEnumerationCompleteAsync(_dataTransfer.Id).ConfigureAwait(false);
}

internal async Task CheckAndUpdateStatusAsync()
{
// If we had a failure or pause during listing, we need to set the status correctly.
Expand Down Expand Up @@ -469,9 +480,9 @@ public void AppendJobPart(JobPartInternal jobPart)
}
}

internal List<string> GetJobPartSourceResourcePaths()
internal HashSet<Uri> GetJobPartSourceResourcePaths()
{
return _jobParts.Select( x => x._sourceResource.Uri.GetPath() ).ToList();
return new HashSet<Uri>(_jobParts.Select(x => x._sourceResource.Uri));
}

internal void QueueJobPart()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
job: this,
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
await OnAllResourcesEnumerated().ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -120,6 +121,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -128,7 +130,7 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
// Start the partNumber based on the last part number. If this is a new job,
// the count will automatically be at 0 (the beginning).
int partNumber = _jobParts.Count;
List<string> existingSources = GetJobPartSourceResourcePaths();
HashSet<Uri> existingSources = GetJobPartSourceResourcePaths();
// Call listing operation on the source container
IAsyncEnumerator<StorageResource> enumerator;

Expand All @@ -152,8 +154,10 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
{
try
{
_cancellationToken.ThrowIfCancellationRequested();
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
await OnAllResourcesEnumerated().ConfigureAwait(false);
enumerationCompleted = true;
continue;
}
Expand All @@ -165,17 +169,13 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;

string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
if (!existingSources.Contains(current.Uri))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

UriToStreamJobPart part;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,79 @@ await AssertDirectorySourceAndDestinationAsync(
destinationContainer: destinationContainer.Container);
}

[Ignore("Likely to fail in pipelines and takes a while to run.")]
[Test, Pairwise]
[LiveOnly]
public async Task ResumeTransferAsync_Directory_Large(
[Values(TransferDirection.Upload, TransferDirection.Download, TransferDirection.Copy)] TransferDirection transferType,
[Values(100)] int blobCount,
[Values(0, 500, 2000)] int delayInMs)
{
// This test is not really meant to run in a pipeline and may fail locally
// depending on timing. Its more meant as a starting place to attempt testing
// pause/resume in different states of the transfer. You may also find adding
// delays in certain parts of the code while testing can help get more
// consistent results.

// Arrange
using DisposingLocalDirectory checkpointerDirectory = DisposingLocalDirectory.GetTestDirectory();
using DisposingLocalDirectory sourceDirectory = DisposingLocalDirectory.GetTestDirectory();
using DisposingLocalDirectory destinationDirectory = DisposingLocalDirectory.GetTestDirectory();
await using DisposingContainer sourceContainer = await GetTestContainerAsync(publicAccessType: PublicAccessType.BlobContainer);
await using DisposingContainer destinationContainer = await GetTestContainerAsync();

BlobsStorageResourceProvider blobProvider = new(GetSharedKeyCredential());
LocalFilesStorageResourceProvider localProvider = new();
TransferManagerOptions options = new TransferManagerOptions()
{
CheckpointerOptions = new TransferCheckpointStoreOptions(checkpointerDirectory.DirectoryPath),
ErrorHandling = DataTransferErrorMode.ContinueOnFailure,
ResumeProviders = new() { blobProvider, localProvider },
};
TransferManager transferManager = new TransferManager(options);
long size = Constants.MB;

(StorageResource sResource, StorageResource dResource) = await CreateStorageResourceContainersAsync(
transferType: transferType,
size: size,
transferCount: blobCount,
sourceDirectoryPath: sourceDirectory.DirectoryPath,
destinationDirectoryPath: destinationDirectory.DirectoryPath,
sourceContainer: sourceContainer.Container,
destinationContainer: destinationContainer.Container,
blobProvider: blobProvider,
localProvider: localProvider);

// Start transfer
DataTransfer transfer = await transferManager.StartTransferAsync(sResource, dResource);

// Sleep before pausing
await Task.Delay(delayInMs);

// Pause Transfer
CancellationTokenSource pauseCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await transferManager.PauseTransferIfRunningAsync(transfer.Id, pauseCancellation.Token);
Assert.AreEqual(DataTransferState.Paused, transfer.TransferStatus.State);

// Resume Transfer
DataTransfer resumeTransfer = await transferManager.ResumeTransferAsync(transfer.Id);

CancellationTokenSource waitTransferCompletion = new CancellationTokenSource(TimeSpan.FromSeconds(600));
await resumeTransfer.WaitForCompletionAsync(waitTransferCompletion.Token);

// Assert
Assert.AreEqual(DataTransferState.Completed, resumeTransfer.TransferStatus.State);
Assert.IsTrue(resumeTransfer.HasCompleted);

// Verify transfer
await AssertDirectorySourceAndDestinationAsync(
transferType: transferType,
sourceResource: sResource as StorageResourceContainer,
destinationResource: dResource as StorageResourceContainer,
sourceContainer: sourceContainer.Container,
destinationContainer: destinationContainer.Container);
}

[Test]
public async Task PauseAllTriggersCorrectPauses()
{
Expand Down