Skip to content

Commit

Permalink
[Storage][DataMovement] Update checkpointer to read/write to job file…
Browse files Browse the repository at this point in the history
… - Part 2 (Azure#39101)
  • Loading branch information
jalauzon-msft authored Oct 6, 2023
1 parent 138c985 commit 7cb32da
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 298 deletions.
15 changes: 15 additions & 0 deletions sdk/storage/Azure.Storage.Common/src/Shared/PooledMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,21 @@ public override int Read(byte[] buffer, int offset, int count)
return read;
}

public override int ReadByte()
{
if (Position >= Length)
{
return -1;
}

(byte[] currentBuffer, int _, long offsetOfBuffer) = GetBufferFromPosition();

byte result = currentBuffer[Position - offsetOfBuffer];
Position += 1;

return result;
}

/// <summary>
/// According the the current <see cref="Position"/> of the stream, gets the correct buffer containing the byte
/// at that position, as well as the stream position represented by the start of the array.
Expand Down
50 changes: 50 additions & 0 deletions sdk/storage/Azure.Storage.Common/tests/PooledMemoryStreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,56 @@ public async Task WriteStream(int dataSize, int bufferPartitionSize)
Assert.AreEqual(0, pooledMemoryStream.Position);
}

[TestCase(1, 0, 1)]
[TestCase(Constants.KB, 512, 2 * Constants.KB)]
[TestCase(Constants.KB, 512, 512)]
[TestCase(107, 99, 52)]
public async Task ReadByte(int dataSize, int initialReadSize, int bufferPartitionSize)
{
// Arrange
byte[] originalData = GetRandomBuffer(dataSize);
PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferPartitionSize);
await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
pooledMemoryStream.Position = 0;

// Read some data initially to test boundary conditions with buffers
if (initialReadSize > 0)
{
byte[] readData = new byte[initialReadSize];
await pooledMemoryStream.ReadAsync(readData, 0, initialReadSize);
}

// Act
byte result = Convert.ToByte(pooledMemoryStream.ReadByte());

// Assert
Assert.AreEqual(initialReadSize + 1, pooledMemoryStream.Position);
Assert.AreEqual(originalData[initialReadSize], result);
}

[TestCase(Constants.KB, 2 * Constants.KB)]
[TestCase(Constants.KB, 512)]
[TestCase(107, 52)]
public async Task ReadByte_Full(int dataSize, int bufferPartitionSize)
{
// Arrange
byte[] originalData = GetRandomBuffer(dataSize);
PooledMemoryStream pooledMemoryStream = new PooledMemoryStream(ArrayPool<byte>.Shared, bufferPartitionSize);
await pooledMemoryStream.WriteAsync(originalData, 0, dataSize);
pooledMemoryStream.Position = 0;

// Act
byte[] result = new byte[originalData.Length];
for (int i = 0; i < originalData.Length; i++)
{
result[i] = Convert.ToByte(pooledMemoryStream.ReadByte());
}

// Assert
Assert.AreEqual(originalData.Length, pooledMemoryStream.Position);
AssertSequenceEqual(originalData, result);
}

private static byte[] GetRandomBuffer(long size)
{
Random random = new Random(Environment.TickCount);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -64,5 +65,35 @@ internal static async Task<DataTransferProperties> GetDataTransferPropertiesAsyn
IsContainer = isContainer,
};
}

internal static async Task<bool> IsEnumerationCompleteAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
{
using (Stream stream = await checkpointer.ReadJobPlanFileAsync(
transferId,
DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
DataMovementConstants.OneByte,
cancellationToken).ConfigureAwait(false))
{
return Convert.ToBoolean(stream.ReadByte());
}
}

internal static async Task OnEnumerationCompleteAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
{
byte[] enumerationComplete = { Convert.ToByte(true) };
await checkpointer.WriteToJobPlanFileAsync(
transferId,
DataMovementConstants.JobPlanFile.EnumerationCompleteIndex,
enumerationComplete,
bufferOffset: 0,
length: 1,
cancellationToken).ConfigureAwait(false);
}
}
}
15 changes: 2 additions & 13 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ internal abstract class JobPartInternal
/// </summary>
internal long? Length;

/// <summary>
/// Defines whether or not this was the final part in the list call. This would determine
/// whether or not we needed to keep listing in the job.
/// </summary>
public bool IsFinalPart { get; internal set; }

internal ClientDiagnostics ClientDiagnostics { get; }

/// <summary>
Expand Down Expand Up @@ -160,7 +154,6 @@ internal JobPartInternal(
TransferCheckpointer checkpointer,
TransferProgressTracker progressTracker,
ArrayPool<byte> arrayPool,
bool isFinalPart,
SyncAsyncEventHandler<TransferStatusEventArgs> jobPartEventHandler,
SyncAsyncEventHandler<TransferStatusEventArgs> statusEventHandler,
SyncAsyncEventHandler<TransferItemFailedEventArgs> failedEventHandler,
Expand All @@ -186,7 +179,6 @@ internal JobPartInternal(
_progressTracker = progressTracker;
_cancellationToken = cancellationToken;
_arrayPool = arrayPool;
IsFinalPart = isFinalPart;
PartTransferStatusEventHandler = jobPartEventHandler;
TransferStatusEventHandler = statusEventHandler;
TransferFailedEventHandler = failedEventHandler;
Expand Down Expand Up @@ -460,13 +452,10 @@ public async virtual Task CleanupAbortedJobPartAsync()
/// Serializes the respective job part and adds it to the checkpointer.
/// </summary>
/// <param name="chunksTotal">Number of chunks in the job part.</param>
/// <param name="isFinalPart">Defines if this part is the last job part of the job.</param>
/// <returns></returns>
public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal, bool isFinalPart)
public async virtual Task AddJobPartToCheckpointerAsync(int chunksTotal)
{
JobPartPlanHeader header = this.ToJobPartPlanHeader(
jobStatus: JobPartStatus,
isFinalPart: isFinalPart);
JobPartPlanHeader header = this.ToJobPartPlanHeader(jobStatus: JobPartStatus);
using (Stream stream = new MemoryStream())
{
header.Serialize(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class ServiceToServiceJobPart : JobPartInternal, IAsyncDisposable
/// <summary>
/// Creating job part based on a single transfer job
/// </summary>
private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber, bool isFinalPart)
private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber)
: base(dataTransfer: job._dataTransfer,
partNumber: partNumber,
sourceResource: job._sourceResource,
Expand All @@ -35,7 +35,6 @@ private ServiceToServiceJobPart(ServiceToServiceTransferJob job, int partNumber,
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
Expand All @@ -54,7 +53,6 @@ private ServiceToServiceJobPart(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default)
: base(dataTransfer: job._dataTransfer,
Expand All @@ -68,7 +66,6 @@ private ServiceToServiceJobPart(
checkpointer: job._checkpointer,
progressTracker: job._progressTracker,
arrayPool: job.UploadArrayPool,
isFinalPart: isFinalPart,
jobPartEventHandler: job.GetJobPartStatus(),
statusEventHandler: job.TransferStatusEventHandler,
failedEventHandler: job.TransferFailedEventHandler,
Expand All @@ -88,15 +85,11 @@ public async ValueTask DisposeAsync()

public static async Task<ServiceToServiceJobPart> CreateJobPartAsync(
ServiceToServiceTransferJob job,
int partNumber,
bool isFinalPart)
int partNumber)
{
// Create Job Part file as we're intializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(
job: job,
partNumber: partNumber,
isFinalPart: isFinalPart);
await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
// Create Job Part file as we're initializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(job, partNumber);
await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
return part;
}

Expand All @@ -105,23 +98,21 @@ public static async Task<ServiceToServiceJobPart> CreateJobPartAsync(
int partNumber,
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
bool isFinalPart,
DataTransferStatus jobPartStatus = default,
long? length = default,
bool partPlanFileExists = false)
{
// Create Job Part file as we're intializing the job part
// Create Job Part file as we're initializing the job part
ServiceToServiceJobPart part = new ServiceToServiceJobPart(
job: job,
partNumber: partNumber,
jobPartStatus: jobPartStatus,
sourceResource: sourceResource,
destinationResource: destinationResource,
isFinalPart: isFinalPart,
length: length);
if (!partPlanFileExists)
{
await part.AddJobPartToCheckpointerAsync(1, isFinalPart).ConfigureAwait(false); // For now we only store 1 chunk
await part.AddJobPartToCheckpointerAsync(1).ConfigureAwait(false); // For now we only store 1 chunk
}
return part;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
// Single resource transfer, we can skip to chunking the job.
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
isFinalPart: true).ConfigureAwait(false);
partNumber: partNumber).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
Expand All @@ -105,30 +104,24 @@ public override async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync
else
{
// Resuming old job with existing job parts
bool isFinalPartFound = false;
foreach (JobPartInternal part in _jobParts)
{
if (!part.JobPartStatus.HasCompletedSuccessfully)
{
part.JobPartStatus.TrySetTransferStateChange(DataTransferState.Queued);
yield return part;

if (part.IsFinalPart)
{
// If we found the final part then we don't have to relist the container.
isFinalPartFound = true;
}
}
}
if (!isFinalPartFound)

if (await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
yield return jobPartInternal;
}
}
}
_enumerationComplete = true;

await OnEnumerationComplete().ConfigureAwait(false);
}

Expand All @@ -154,10 +147,9 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
yield break;
}

// List the container keep track of the last job part in order to store it properly
// so we know we finished enumerating/listed.
// List the container in this specific way because MoveNext needs to be separately wrapped
// in a try/catch as we can't yield return inside a try/catch.
bool enumerationCompleted = false;
StorageResource lastResource = default;
while (!enumerationCompleted)
{
try
Expand All @@ -175,68 +167,36 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
}

StorageResource current = enumerator.Current;
if (lastResource != default)
{
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? lastResource.Uri.GetPath()
: lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);

if (!existingSources.Contains(sourceName))
{
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
ServiceToServiceJobPart part;
try
{
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: (StorageResourceItem)lastResource,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName),
isFinalPart: false).ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
}
lastResource = current;
}
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string sourceName = string.IsNullOrEmpty(containerUriPath)
? current.Uri.GetPath()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);

// It's possible to have no job parts in a job
if (lastResource != default)
{
ServiceToServiceJobPart lastPart;
try
if (!existingSources.Contains(sourceName))
{
// Return last part but enable the part to be the last job part of the entire job
// so we know that we've finished listing in the container
string containerUriPath = _sourceResourceContainer.Uri.GetPath();
string lastSourceName = string.IsNullOrEmpty(containerUriPath)
? lastResource.Uri.GetPath()
: lastResource.Uri.GetPath().Substring(containerUriPath.Length + 1);

lastPart = await ServiceToServiceJobPart.CreateJobPartAsync(
// Because AsyncEnumerable doesn't let us know which storage resource is the last resource
// we only yield return when we know this is not the last storage resource to be listed
// from the container.
ServiceToServiceJobPart part;
try
{
part = await ServiceToServiceJobPart.CreateJobPartAsync(
job: this,
partNumber: partNumber,
sourceResource: (StorageResourceItem)lastResource,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(lastSourceName),
isFinalPart: true).ConfigureAwait(false);
AppendJobPart(lastPart);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
sourceResource: (StorageResourceItem)current,
destinationResource: _destinationResourceContainer.GetStorageResourceReference(sourceName))
.ConfigureAwait(false);
AppendJobPart(part);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
yield return part;
partNumber++;
}
yield return lastPart;
}
}
}
Expand Down
Loading

0 comments on commit 7cb32da

Please sign in to comment.