Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Cleanup after recent changes to checkpointer #39456

Merged
merged 18 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Append);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Append,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@
<Compile Include="$(AzureStorageDataMovementSharedSources)Errors.DataMovement.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)DataMovementConstants.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)DataTransferStatusInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)JobPlanExtensions.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)ResponseExtensions.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)LocalTransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)StorageResourceItemInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)StorageResourceContainerInternal.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementSharedSources)TransferCheckpointer.cs" LinkBase="Shared\DataMovement" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageDataMovementSharedSources)\JobPlan\*" LinkBase="Shared\DataMovement\JobPlan" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageBlobsSharedSources)BlobErrors.cs" LinkBase="Shared\Blobs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Text;
using Azure.Core;
using Azure.Storage.Blobs.Models;
using static Azure.Storage.DataMovement.JobPlanExtensions;
using Metadata = System.Collections.Generic.IDictionary<string, string>;
using Tags = System.Collections.Generic.IDictionary<string, string>;

Expand Down Expand Up @@ -73,7 +72,7 @@ public BlobDestinationCheckpointData(
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : Array.Empty<byte>();
}

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));

Expand All @@ -87,31 +86,31 @@ public override void Serialize(Stream stream)
writer.Write((byte)BlobType);

// ContentType offset/length
WriteVariableLengthFieldInfo(writer, _contentTypeBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentTypeBytes.Length, ref currentVariableLengthIndex);

// ContentEncoding offset/length
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentEncodingBytes.Length, ref currentVariableLengthIndex);

// ContentLanguage offset/length
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentLanguageBytes.Length, ref currentVariableLengthIndex);

// ContentDisposition offset/length
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_contentDispositionBytes.Length, ref currentVariableLengthIndex);

// CacheControl offset/length
WriteVariableLengthFieldInfo(writer, _cacheControlBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_cacheControlBytes.Length, ref currentVariableLengthIndex);

// AccessTier
writer.Write((byte)AccessTier.ToJobPlanAccessTier());

// Metadata offset/length
WriteVariableLengthFieldInfo(writer, _metadataBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_metadataBytes.Length, ref currentVariableLengthIndex);

// Tags offset/length
WriteVariableLengthFieldInfo(writer, _tagsBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_tagsBytes.Length, ref currentVariableLengthIndex);

// CpkScope offset/length
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes.Length, ref currentVariableLengthIndex);
writer.WriteVariableLengthFieldInfo(_cpkScopeBytes.Length, ref currentVariableLengthIndex);

writer.Write(_contentTypeBytes);
writer.Write(_contentEncodingBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public BlobSourceCheckpointData(BlobType blobType)

public override int Length => DataMovementBlobConstants.SourceCheckpointData.DataSize;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));
BinaryWriter writer = new BinaryWriter(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
}
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
// Source blob type does not matter for container
return new BlobSourceCheckpointData(BlobType.Block);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
_options?.BlobType ?? BlobType.Block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Block);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new BlobSourceCheckpointData(BlobType.Page);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Page,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
<Compile Include="$(AzureStorageSharedTestSources)\RandomExtensions.cs" LinkBase="Shared\Storage" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageDataMovementTestSharedSources)CheckpointerTesting.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)DisposingLocalDirectory.cs" LinkBase="Shared\DataMovement" />
<Compile Include="$(AzureStorageDataMovementTestSharedSources)TransferUtility.cs" LinkBase="Shared\DataMovement" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,6 @@ private enum StorageResourceType
Local
}

private static string ToResourceId(StorageResourceType type)
{
return type switch
{
StorageResourceType.BlockBlob => "BlockBlob",
StorageResourceType.PageBlob => "PageBlob",
StorageResourceType.AppendBlob => "AppendBlob",
_ => throw new NotImplementedException(),
};
}

private static string ToProviderId(StorageResourceType type)
{
return type switch
Expand Down Expand Up @@ -91,12 +80,9 @@ private static byte[] GetBytes(BlobCheckpointData checkpointData)
}

private static Mock<DataTransferProperties> GetProperties(
string checkpointerPath,
string transferId,
string sourcePath,
string destinationPath,
string sourceResourceId,
string destinationResourceId,
string sourceProviderId,
string destinationProviderId,
bool isContainer,
Expand All @@ -105,11 +91,8 @@ private static Mock<DataTransferProperties> GetProperties(
{
var mock = new Mock<DataTransferProperties>(MockBehavior.Strict);
mock.Setup(p => p.TransferId).Returns(transferId);
mock.Setup(p => p.Checkpointer).Returns(new TransferCheckpointStoreOptions(checkpointerPath));
mock.Setup(p => p.SourceUri).Returns(new Uri(sourcePath));
mock.Setup(p => p.DestinationUri).Returns(new Uri(destinationPath));
mock.Setup(p => p.SourceTypeId).Returns(sourceResourceId);
mock.Setup(p => p.DestinationTypeId).Returns(destinationResourceId);
mock.Setup(p => p.SourceProviderId).Returns(sourceProviderId);
mock.Setup(p => p.DestinationProviderId).Returns(destinationProviderId);
mock.Setup(p => p.SourceCheckpointData).Returns(GetBytes(sourceCheckpointData));
Expand All @@ -122,7 +105,6 @@ private static Mock<DataTransferProperties> GetProperties(
public async Task RehydrateBlockBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -132,12 +114,9 @@ public async Task RehydrateBlockBlob(
StorageResourceType destinationType = StorageResourceType.BlockBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -155,7 +134,6 @@ public async Task RehydrateBlockBlob(
[Test]
public async Task RehydrateBlockBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -165,12 +143,9 @@ public async Task RehydrateBlockBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Block, AccessTier.Cool);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -195,7 +170,6 @@ public async Task RehydrateBlockBlob_Options()
public async Task RehydratePageBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -205,12 +179,9 @@ public async Task RehydratePageBlob(
StorageResourceType destinationType = StorageResourceType.PageBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -228,7 +199,6 @@ public async Task RehydratePageBlob(
[Test]
public async Task RehydratePageBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -238,12 +208,9 @@ public async Task RehydratePageBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Page, AccessTier.P30);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -268,7 +235,6 @@ public async Task RehydratePageBlob_Options()
public async Task RehydrateAppendBlob(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -278,12 +244,9 @@ public async Task RehydrateAppendBlob(
StorageResourceType destinationType = StorageResourceType.AppendBlob;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -301,7 +264,6 @@ public async Task RehydrateAppendBlob(
[Test]
public async Task RehydrateAppendBlob_Options()
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource";
string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest";
Expand All @@ -311,12 +273,9 @@ public async Task RehydrateAppendBlob_Options()

BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Append, accessTier: default);
DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourcePath,
destinationPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: false,
Expand All @@ -341,7 +300,6 @@ public async Task RehydrateAppendBlob_Options()
public async Task RehydrateBlobContainer(
[Values(true, false)] bool isSource)
{
using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory();
string transferId = GetNewTransferId();
List<string> sourcePaths = new List<string>();
string sourceParentPath = "https://storageaccount.blob.core.windows.net/sourcecontainer";
Expand All @@ -361,12 +319,9 @@ public async Task RehydrateBlobContainer(
string originalPath = isSource ? sourceParentPath : destinationParentPath;

DataTransferProperties transferProperties = GetProperties(
test.DirectoryPath,
transferId,
sourceParentPath,
destinationParentPath,
ToResourceId(sourceType),
ToResourceId(destinationType),
ToProviderId(sourceType),
ToProviderId(destinationType),
isContainer: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
}
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new ShareFileSourceCheckpointData();
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new ShareFileDestinationCheckpointData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ShareFileDestinationCheckpointData : StorageResourceCheckpointDat
{
public override int Length => 0;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class ShareFileSourceCheckpointData : StorageResourceCheckpointData
{
public override int Length => 0;

public override void Serialize(Stream stream)
protected override void Serialize(Stream stream)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ protected override async Task<StorageResourceReadStreamResult> ReadStreamAsync(
return response.Value.ToStorageResourceReadStreamResult();
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
protected override StorageResourceCheckpointData GetSourceCheckpointData()
{
return new ShareFileSourceCheckpointData();
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new ShareFileDestinationCheckpointData();
}
Expand Down
Loading
Loading