From 7cb32da479914e7e411cb4e9d0bb879731289a52 Mon Sep 17 00:00:00 2001
From: Jacob Lauzon <96087589+jalauzon-msft@users.noreply.github.com>
Date: Fri, 6 Oct 2023 13:33:41 -0700
Subject: [PATCH] [Storage][DataMovement] Update checkpointer to read/write to
job file - Part 2 (#39101)
---
.../src/Shared/PooledMemoryStream.cs | 15 +++
.../tests/PooledMemoryStreamTests.cs | 50 +++++++++
.../src/CheckpointerExtensions.cs | 31 ++++++
.../src/JobPartInternal.cs | 15 +--
.../src/ServiceToServiceJobPart.cs | 23 ++--
.../src/ServiceToServiceTransferJob.cs | 100 ++++++------------
.../src/Shared/DataMovementExtensions.cs | 24 ++---
.../src/StreamToUriJobPart.cs | 32 ++----
.../src/StreamToUriTransferJob.cs | 100 ++++++------------
.../src/TransferJobInternal.cs | 11 ++
.../src/UriToStreamJobPart.cs | 26 ++---
.../src/UriToStreamTransferJob.cs | 100 ++++++------------
12 files changed, 229 insertions(+), 298 deletions(-)
diff --git a/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs b/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs
index 22911a8d287eb..3e218d18a90af 100644
--- a/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs
+++ b/sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs
@@ -259,6 +259,21 @@ public override int Read(byte[] buffer, int offset, int count)
return read;
}
+ public override int ReadByte()
+ {
+ if (Position >= Length)
+ {
+ return -1;
+ }
+
+ (byte[] currentBuffer, int _, long offsetOfBuffer) = GetBufferFromPosition();
+
+ byte result = currentBuffer[Position - offsetOfBuffer];
+ Position += 1;
+
+ return result;
+ }
+
///
/// According the the current of the stream, gets the correct buffer containing the byte
/// at that position, as well as the stream position represented by the start of the array.
diff --git a/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs b/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs
index a52096090641d..696ece67018d3 100644
--- a/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs
+++ b/sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs
@@ -105,6 +105,56 @@ public async Task WriteStream(int dataSize, int bufferPartitionSize)
Assert.AreEqual(0, pooledMemoryStream.Position);
}
+ [TestCase(1, 0, 1)]
+ [TestCase(Constants.KB, 512, 2 * Constants.KB)]
+ [TestCase(Constants.KB, 512, 512)]
+ [TestCase(107, 99, 52)]
+ public async Task ReadByte(int dataSize, int initialReadSize, int bufferPartitionSize)
+ {
+ // Arrange
+ byte[] originalData = GetRandomBuffer(dataSize);
+ PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool.Shared, bufferPartitionSize);
+ await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
+ pooledMemoryStream.Position = 0;
+
+ // Read some data initially to test boundary conditions with buffers
+ if (initialReadSize > 0)
+ {
+ byte[] readData = new byte[initialReadSize];
+ await pooledMemoryStream.ReadAsync(readData, 0, initialReadSize);
+ }
+
+ // Act
+ byte result = Convert.ToByte(pooledMemoryStream.ReadByte());
+
+ // Assert
+ Assert.AreEqual(initialReadSize + 1, pooledMemoryStream.Position);
+ Assert.AreEqual(originalData[initialReadSize], result);
+ }
+
+ [TestCase(Constants.KB, 2 * Constants.KB)]
+ [TestCase(Constants.KB, 512)]
+ [TestCase(107, 52)]
+ public async Task ReadByte_Full(int dataSize, int bufferPartitionSize)
+ {
+ // Arrange
+ byte[] originalData = GetRandomBuffer(dataSize);
+ PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool.Shared, bufferPartitionSize);
+ await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
+ pooledMemoryStream.Position = 0;
+
+ // Act
+ byte[] result = new byte[originalData.Length];
+ for (int i = 0; i < originalData.Length; i++)
+ {
+ result[i] = Convert.ToByte(pooledMemoryStream.ReadByte());
+ }
+
+ // Assert
+ Assert.AreEqual(originalData.Length, pooledMemoryStream.Position);
+ AssertSequenceEqual(originalData, result);
+ }
+
private static byte[] GetRandomBuffer(long size)
{
Random random = new Random(Environment.TickCount);
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs
index 28200dbf14669..21aa786e8cd89 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
+using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -64,5 +65,35 @@ internal static async Task GetDataTransferPropertiesAsyn
IsContainer = isContainer,
};
}
+
+ internal static async Task IsEnumerationCompleteAsync(
+ this TransferCheckpointer checkpointer,
+ string transferId,
+ CancellationToken cancellationToken)
+ {
+ using (Stream stream = await checkpointer.ReadJobPlanFileAsync(
+ transferId,
+ DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
+ DataMovementConstants.OneByte,
+ cancellationToken).ConfigureAwait(false))
+ {
+ return Convert.ToBoolean(stream.ReadByte());
+ }
+ }
+
+ internal static async Task OnEnumerationCompleteAsync(
+ this TransferCheckpointer checkpointer,
+ string transferId,
+ CancellationToken cancellationToken)
+ {
+ byte[] enumerationComplete = { Convert.ToByte(true) };
+ await checkpointer.WriteToJobPlanFileAsync(
+ transferId,
+ DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
+ enumerationComplete,
+ bufferOffset: 0,
+ length: 1,
+ cancellationToken).ConfigureAwait(false);
+ }
}
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
index d51e7d7a438d5..ce1bea76bdd0c 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
@@ -103,12 +103,6 @@ internal abstract class JobPartInternal
///
internal long? Length;
- ///
- /// Defines whether or not this was the final part in the list call. This would determine
- /// whether or not we needed to keep listing in the job.
- ///
- public bool IsFinalPart { get; internal set; }
-
internal ClientDiagnostics ClientDiagnostics { get; }
///
@@ -160,7 +154,6 @@ internal JobPartInternal(
TransferCheckpointer checkpointer,
TransferProgressTracker progressTracker,
ArrayPool arrayPool,
- bool isFinalPart,
SyncAsyncEventHandler jobPartEventHandler,
SyncAsyncEventHandler statusEventHandler,
SyncAsyncEventHandler failedEventHandler,
@@ -186,7 +179,6 @@ internal JobPartInternal(
_progressTracker = progressTracker;
_cancellationToken = cancellationToken;
_arrayPool = arrayPool;
- IsFinalPart = isFinalPart;
PartTransferStatusEventHandler = jobPartEventHandler;
TransferStatusEventHandler = statusEventHandler;
TransferFailedEventHandler = failedEventHandler;
@@ -460,13 +452,10 @@ public async virtual Task CleanupAbortedJobPartAsync()
/// Serializes the respective job part and adds it to the checkpointer.
///
/// Number of chunks in the job part.
- /// Defines if this part is the last job part of the job.
///
- public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal, bool isFinalPart)
+ public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal)
{
- JobPartPlanHeader header = this.ToJobPartPlanHeader(
- jobStatus: JobPartStatus,
- isFinalPart: isFinalPart);
+ JobPartPlanHeader header = this.ToJobPartPlanHeader(jobStatus: JobPartStatus);
using (Stream stream = new MemoryStream())
{
header.Serialize(stream);
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
index f0fce0e2c19fa..258380048d50f 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceJobPart.cs
@@ -23,7 +23,7 @@ internal class ServiceToServiceJobPart : JobPartInternal, IAsyncDisposable
///
/// Creating job part based on a single transfer job
///
- private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber, bool isFinalPart)
+ private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber)
: base(dataTransfer: job._dataTransfer,
partNumber: partNumber,
sourceResource: job._sourceResource,
@@ -35,7 +35,6 @@ private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber,
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -54,7 +53,6 @@ private ServiceToServiceJobPart(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default)
: base(dataTransfer: job._dataTransfer,
@@ -68,7 +66,6 @@ private ServiceToServiceJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -88,15 +85,11 @@ public async ValueTask DisposeAsync()
public static async Task CreateJobPartAsync(
ServiceToServiceTransferJob job,
- int partNumber,
- bool isFinalPart)
+ int partNumber)
{
- // Create Job Part file as we're intializing the job part
- ServiceToServiceJobPart part = new ServiceToServiceJobPart(
- job: job,
- partNumber: partNumber,
- isFinalPart: isFinalPart);
- await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
+ // Create Job Part file as we're initializing the job part
+ ServiceToServiceJobPart part = new ServiceToServiceJobPart(job, partNumber);
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
return part;
}
@@ -105,23 +98,21 @@ public static async Task CreateJobPartAsync(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default,
bool partPlanFileExists = false)
{
- // Create Job Part file as we're intializing the job part
+ // Create Job Part file as we're initializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(
job: job,
partNumber: partNumber,
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- isFinalPart: isFinalPart,
length: length);
if (!partPlanFileExists)
{
- await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
}
return part;
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs
index ee6217f4b7e7f..d11c4cec99fb6 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs
@@ -83,8 +83,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
// Single resource transfer, we can skip to chunking the job.
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
- partNumber: partNumber,
- isFinalPart: true).ConfigureAwait(false);
+ partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
@@ -105,22 +104,16 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
else
{
// Resuming old job with existing job parts
- bool isFinalPartFound = false;
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;
-
- if (part.IsFinalPart)
- {
- // If we found the final part then we don't have to relist the container.
- isFinalPartFound = true;
- }
}
}
- if (!isFinalPartFound)
+
+ if (await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
@@ -128,7 +121,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
}
}
}
- _enumerationComplete = true;
+
await OnEnumerationComplete().ConfigureAwait(false);
}
@@ -154,10 +147,9 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
yield break;
}
- // List the container keep track of the last job part in order to store it properly
- // so we know we finished enumerating/listed.
+ // List the container in this specific way because MoveNext needs to be separately wrapped
+ // in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
- StorageResource lastResource = default;
while (!enumerationCompleted)
{
try
@@ -175,68 +167,36 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
}
StorageResource current = enumerator.Current;
- if (lastResource != default)
- {
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string sourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
- if (!existingSources.Contains(sourceName))
- {
- // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
- // we only yield return when we know this is not the last storage resource to be listed
- // from the container.
- ServiceToServiceJobPart part;
- try
- {
- part = await ServiceToServiceJobPart.CreateJobPartAsync(
- job: this,
- partNumber: partNumber,
- sourceResource: (StorageResourceItem)lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName),
- isFinalPart: false).ConfigureAwait(false);
- AppendJobPart(part);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
- }
- yield return part;
- partNumber++;
- }
- }
- lastResource = current;
- }
+ string containerUriPath = _sourceResourceContainer.Uri.GetPath();
+ string sourceName = string.IsNullOrEmpty(containerUriPath)
+ ? current.Uri.GetPath()
+ : current.Uri.GetPath().Substring(containerUriPath.Length + 1);
- // It's possible to have no job parts in a job
- if (lastResource != default)
- {
- ServiceToServiceJobPart lastPart;
- try
+ if (!existingSources.Contains(sourceName))
{
- // Return last part but enable the part to be the last job part of the entire job
- // so we know that we've finished listing in the container
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string lastSourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
-
- lastPart = await ServiceToServiceJobPart.CreateJobPartAsync(
+ // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
+ // we only yield return when we know this is not the last storage resource to be listed
+ // from the container.
+ ServiceToServiceJobPart part;
+ try
+ {
+ part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
- sourceResource: (StorageResourceItem)lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(lastSourceName),
- isFinalPart: true).ConfigureAwait(false);
- AppendJobPart(lastPart);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
+ sourceResource: (StorageResourceItem)current,
+ destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName))
+ .ConfigureAwait(false);
+ AppendJobPart(part);
+ }
+ catch (Exception ex)
+ {
+ await InvokeFailedArgAsync(ex).ConfigureAwait(false);
+ yield break;
+ }
+ yield return part;
+ partNumber++;
}
- yield return lastPart;
}
}
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs
index 49d49f86c7adc..3d5d27b7f2726 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementExtensions.cs
@@ -36,8 +36,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -62,8 +61,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -88,8 +86,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -118,8 +115,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource.GetStorageResourceReference(childSourceName),
destinationResource: destinationResource.GetStorageResourceReference(childDestinationName),
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -146,8 +142,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource.GetStorageResourceReference(childSourcePath.Substring(sourceResource.Uri.AbsoluteUri.Length + 1)),
destinationResource: destinationResource.GetStorageResourceReference(childDestinationPath.Substring(destinationResource.Uri.AbsoluteUri.Length + 1)),
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -176,8 +171,7 @@ public static async Task ToJobPartAsync(
jobPartStatus: jobPartStatus,
sourceResource: sourceResource.GetStorageResourceReference(childSourceName),
destinationResource: destinationResource.GetStorageResourceReference(childDestinationName),
- partPlanFileExists: true,
- isFinalPart: header.IsFinalPart).ConfigureAwait(false);
+ partPlanFileExists: true).ConfigureAwait(false);
jobPart.VerifyJobPartPlanHeader(header);
@@ -188,9 +182,7 @@ public static async Task ToJobPartAsync(
///
/// Translate the initial job part header to a job plan format file
///
- internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPart,
- DataTransferStatus jobStatus,
- bool isFinalPart)
+ internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPart, DataTransferStatus jobStatus)
{
JobPartPlanDestinationBlob dstBlobData = new JobPartPlanDestinationBlob(
blobType: JobPlanBlobType.Detect, // TODO: update when supported
@@ -235,7 +227,7 @@ internal static JobPartPlanHeader ToJobPartPlanHeader(this JobPartInternal jobPa
destinationResourceId: jobPart._destinationResource.ResourceId,
destinationPath: destinationPath,
destinationExtraQuery: "", // TODO: convert options to string
- isFinalPart: isFinalPart,
+ isFinalPart: false,
forceWrite: jobPart._createMode == StorageResourceCreationPreference.OverwriteIfExists, // TODO: change to enum value
forceIfReadOnly: false, // TODO: revisit for Azure Files
autoDecompress: false, // TODO: revisit if we want to support this feature
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
index 9fd5b083d9d17..d1cc783dd054b 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
@@ -23,10 +23,7 @@ internal class StreamToUriJobPart : JobPartInternal, IAsyncDisposable
///
/// Creating job part based on a single transfer job
///
- private StreamToUriJobPart(
- StreamToUriTransferJob job,
- int partNumber,
- bool isFinalPart)
+ private StreamToUriJobPart(StreamToUriTransferJob job, int partNumber)
: base(dataTransfer: job._dataTransfer,
partNumber: partNumber,
sourceResource: job._sourceResource,
@@ -38,7 +35,6 @@ private StreamToUriJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -57,7 +53,6 @@ private StreamToUriJobPart(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default)
: base(dataTransfer: job._dataTransfer,
@@ -71,7 +66,6 @@ private StreamToUriJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -91,17 +85,11 @@ public async ValueTask DisposeAsync()
public static async Task CreateJobPartAsync(
StreamToUriTransferJob job,
- int partNumber,
- bool isFinalPart)
+ int partNumber)
{
- // Create Job Part file as we're intializing the job part
- StreamToUriJobPart part = new StreamToUriJobPart(
- job: job,
- partNumber: partNumber,
- isFinalPart: isFinalPart);
- await part.AddJobPartToCheckpointerAsync(
- chunksTotal: 1, // For now we only store 1 chunk
- isFinalPart: isFinalPart).ConfigureAwait(false);
+ // Create Job Part file as we're initializing the job part
+ StreamToUriJobPart part = new StreamToUriJobPart(job, partNumber);
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
return part;
}
@@ -110,25 +98,21 @@ public static async Task CreateJobPartAsync(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default,
bool partPlanFileExists = false)
{
- // Create Job Part file as we're intializing the job part
+ // Create Job Part file as we're initializing the job part
StreamToUriJobPart part = new StreamToUriJobPart(
job: job,
partNumber: partNumber,
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- length: length,
- isFinalPart: isFinalPart);
+ length: length);
if (!partPlanFileExists)
{
- await part.AddJobPartToCheckpointerAsync(
- chunksTotal: 1,
- isFinalPart: isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
}
return part;
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs
index df37661cdb910..c6faf08967461 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs
@@ -81,8 +81,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
// Single resource transfer, we can skip to chunking the job.
part = await StreamToUriJobPart.CreateJobPartAsync(
job: this,
- partNumber: partNumber,
- isFinalPart: true).ConfigureAwait(false);
+ partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
@@ -103,22 +102,16 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
else
{
// Resuming old job with existing job parts
- bool isFinalPartFound = false;
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;
-
- if (part.IsFinalPart)
- {
- // If we found the final part then we don't have to relist the container.
- isFinalPartFound = true;
- }
}
}
- if (!isFinalPartFound)
+
+ if (await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
@@ -126,7 +119,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
}
}
}
- _enumerationComplete = true;
+
await OnEnumerationComplete().ConfigureAwait(false);
}
@@ -152,10 +145,9 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
yield break;
}
- // List the container keep track of the last job part in order to store it properly
- // so we know we finished enumerating/listed.
+ // List the container in this specific way because MoveNext needs to be separately wrapped
+ // in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
- StorageResource lastResource = default;
while (!enumerationCompleted)
{
try
@@ -173,68 +165,36 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
}
StorageResource current = enumerator.Current;
- if (lastResource != default)
- {
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string sourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
- if (!existingSources.Contains(sourceName))
- {
- // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
- // we only yield return when we know this is not the last storage resource to be listed
- // from the container.
- StreamToUriJobPart part;
- try
- {
- part = await StreamToUriJobPart.CreateJobPartAsync(
- job: this,
- partNumber: partNumber,
- sourceResource: (StorageResourceItem)lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName),
- isFinalPart: false).ConfigureAwait(false);
- AppendJobPart(part);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
- }
- yield return part;
- partNumber++;
- }
- }
- lastResource = current;
- }
+ string containerUriPath = _sourceResourceContainer.Uri.GetPath();
+ string sourceName = string.IsNullOrEmpty(containerUriPath)
+ ? current.Uri.GetPath()
+ : current.Uri.GetPath().Substring(containerUriPath.Length + 1);
- // It's possible to have no job parts in a job
- if (lastResource != default)
- {
- StreamToUriJobPart lastPart;
- try
+ if (!existingSources.Contains(sourceName))
{
- // Return last part but enable the part to be the last job part of the entire job
- // so we know that we've finished listing in the container
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string lastSourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
-
- lastPart = await StreamToUriJobPart.CreateJobPartAsync(
+ // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
+ // we only yield return when we know this is not the last storage resource to be listed
+ // from the container.
+ StreamToUriJobPart part;
+ try
+ {
+ part = await StreamToUriJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
- sourceResource: (StorageResourceItem)lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(lastSourceName),
- isFinalPart: true).ConfigureAwait(false);
- AppendJobPart(lastPart);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
+ sourceResource: (StorageResourceItem)current,
+ destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName))
+ .ConfigureAwait(false);
+ AppendJobPart(part);
+ }
+ catch (Exception ex)
+ {
+ await InvokeFailedArgAsync(ex).ConfigureAwait(false);
+ yield break;
+ }
+ yield return part;
+ partNumber++;
}
- yield return lastPart;
}
}
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs
index c030881e72dea..400f56677b15d 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs
@@ -405,6 +405,17 @@ await _checkpointer.SetJobTransferStatusAsync(
internal async Task OnEnumerationComplete()
{
+ try
+ {
+ await _checkpointer.OnEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ await InvokeFailedArgAsync(ex).ConfigureAwait(false);
+ return;
+ }
+ _enumerationComplete = true;
+
// If there were no job parts enumerated and we haven't already aborted/completed the job.
if (_jobParts.Count == 0 &&
_dataTransfer.TransferStatus.State != DataTransferState.Paused &&
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
index 7e80aed3afa0c..bc86ff3a50ffd 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs
@@ -26,8 +26,7 @@ internal class UriToStreamJobPart : JobPartInternal, IAsyncDisposable
///
private UriToStreamJobPart(
UriToStreamTransferJob job,
- int partNumber,
- bool isFinalPart)
+ int partNumber)
: base(dataTransfer: job._dataTransfer,
partNumber: partNumber,
sourceResource: job._sourceResource,
@@ -39,7 +38,6 @@ private UriToStreamJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -57,7 +55,6 @@ private UriToStreamJobPart(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default)
: base(dataTransfer: job._dataTransfer,
@@ -71,7 +68,6 @@ private UriToStreamJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
- isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
@@ -85,17 +81,11 @@ private UriToStreamJobPart(
public static async Task CreateJobPartAsync(
UriToStreamTransferJob job,
- int partNumber,
- bool isFinalPart)
+ int partNumber)
{
- // Create Job Part file as we're intializing the job part
- UriToStreamJobPart part = new UriToStreamJobPart(
- job: job,
- partNumber: partNumber,
- isFinalPart: isFinalPart);
- await part.AddJobPartToCheckpointerAsync(
- chunksTotal: 1,
- isFinalPart: isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
+ // Create Job Part file as we're initializing the job part
+ UriToStreamJobPart part = new UriToStreamJobPart(job, partNumber);
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
return part;
}
@@ -104,23 +94,21 @@ public static async Task CreateJobPartAsync(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
- bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default,
bool partPlanFileExists = false)
{
- // Create Job Part file as we're intializing the job part
+ // Create Job Part file as we're initializing the job part
UriToStreamJobPart part = new UriToStreamJobPart(
job: job,
partNumber: partNumber,
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
- isFinalPart: isFinalPart,
length: length);
if (!partPlanFileExists)
{
- await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
+ await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
}
return part;
}
diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs
index 67bcdf915c99c..adf52253ffaaf 100644
--- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs
+++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamTransferJob.cs
@@ -81,8 +81,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
// Single resource transfer, we can skip to chunking the job.
part = await UriToStreamJobPart.CreateJobPartAsync(
job: this,
- partNumber: partNumber,
- isFinalPart: true).ConfigureAwait(false);
+ partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
@@ -103,22 +102,16 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
else
{
// Resuming old job with existing job parts
- bool isFinalPartFound = false;
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;
-
- if (part.IsFinalPart)
- {
- // If we found the final part then we don't have to relist the container.
- isFinalPartFound = true;
- }
}
}
- if (!isFinalPartFound)
+
+ if (await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
@@ -126,7 +119,7 @@ public override async IAsyncEnumerable ProcessJobToJobPartAsync
}
}
}
- _enumerationComplete = true;
+
await OnEnumerationComplete().ConfigureAwait(false);
}
@@ -152,10 +145,9 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
yield break;
}
- // List the container keep track of the last job part in order to store it properly
- // so we know we finished enumerating/listed.
+ // List the container in this specific way because MoveNext needs to be separately wrapped
+ // in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
- StorageResource lastResource = default;
while (!enumerationCompleted)
{
try
@@ -173,68 +165,36 @@ private async IAsyncEnumerable GetStorageResourcesAsync()
}
StorageResource current = enumerator.Current;
- if (lastResource != default)
- {
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string sourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
- if (!existingSources.Contains(sourceName))
- {
- // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
- // we only yield return when we know this is not the last storage resource to be listed
- // from the container.
- UriToStreamJobPart part;
- try
- {
- part = await UriToStreamJobPart.CreateJobPartAsync(
- job: this,
- partNumber: partNumber,
- sourceResource: (StorageResourceItem)lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName),
- isFinalPart: false).ConfigureAwait(false);
- AppendJobPart(part);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
- }
- yield return part;
- partNumber++;
- }
- }
- lastResource = current;
- }
+ string containerUriPath = _sourceResourceContainer.Uri.GetPath();
+ string sourceName = string.IsNullOrEmpty(containerUriPath)
+ ? current.Uri.GetPath()
+ : current.Uri.GetPath().Substring(containerUriPath.Length + 1);
- // It's possible to have no job parts in a job
- if (lastResource != default)
- {
- UriToStreamJobPart lastPart;
- try
+ if (!existingSources.Contains(sourceName))
{
- // Return last part but enable the part to be the last job part of the entire job
- // so we know that we've finished listing in the container
- string containerUriPath = _sourceResourceContainer.Uri.GetPath();
- string lastSourceName = string.IsNullOrEmpty(containerUriPath)
- ? lastResource.Uri.GetPath()
- : lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);
-
- lastPart = await UriToStreamJobPart.CreateJobPartAsync(
+ // Because AsyncEnumerable doesn't let us know which storage resource is the last resource
+ // we only yield return when we know this is not the last storage resource to be listed
+ // from the container.
+ UriToStreamJobPart part;
+ try
+ {
+ part = await UriToStreamJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
- sourceResource: (StorageResourceItem) lastResource,
- destinationResource: _destinationResourceContainer.GetStorageResourceReference(lastSourceName),
- isFinalPart: true).ConfigureAwait(false);
- AppendJobPart(lastPart);
- }
- catch (Exception ex)
- {
- await InvokeFailedArgAsync(ex).ConfigureAwait(false);
- yield break;
+ sourceResource: (StorageResourceItem)current,
+ destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName))
+ .ConfigureAwait(false);
+ AppendJobPart(part);
+ }
+ catch (Exception ex)
+ {
+ await InvokeFailedArgAsync(ex).ConfigureAwait(false);
+ yield break;
+ }
+ yield return part;
+ partNumber++;
}
- yield return lastPart;
}
}
}