Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to @azure/storage-blob library with a fix #258

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 115 additions & 114 deletions common-npm-packages/az-blobstorage-provider/azureBlobStorageProvider.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// // Workaround to define globalThis for Node 10
var globalThis = require('globalthis')();
(global as any).globalThis = globalThis;

import path = require('path');
import azureStorage = require('azure-storage');
import fs = require('fs');
import { Readable } from 'stream';
import models = require('artifact-engine/Models');
import store = require('artifact-engine/Store');
import tl = require('azure-pipelines-task-lib/task');
import { BlobItem, BlobServiceClient, ContainerClient, StorageSharedKeyCredential } from '@azure/storage-blob';
import abortController = require("@azure/abort-controller");

const resourcePath: string = path.join(__dirname, 'module.json');
tl.setResourcePath(resourcePath);
Expand All @@ -12,158 +17,154 @@ export class AzureBlobProvider implements models.IArtifactProvider {

public artifactItemStore: store.ArtifactItemStore;

constructor(storageAccount: string, container: string, accessKey: string, prefixFolderPath?: string, host?: string, addPrefixToDownloadedItems?: boolean) {
private _storageAccount: string;
private _accessKey: string;
private _containerName: string;
private _prefixFolderPath: string;
private _containerClient: ContainerClient;
private _blobServiceClient: BlobServiceClient;
private _addPrefixToDownloadedItems: boolean = false;

constructor(storageAccount: string, containerName: string, accessKey: string, prefixFolderPath?: string, host?: string, addPrefixToDownloadedItems?: boolean) {
this._storageAccount = storageAccount;
this._accessKey = accessKey;
this._container = container;
this._containerName = containerName;

if (!!prefixFolderPath) {
this._prefixFolderPath = prefixFolderPath.endsWith("/") ? prefixFolderPath : prefixFolderPath + "/";
} else {
this._prefixFolderPath = "";
}
this._blobSvc = azureStorage.createBlobService(this._storageAccount, this._accessKey, host);

const sharedKeyCredential = new StorageSharedKeyCredential(this._storageAccount, this._accessKey);

this._blobServiceClient = new BlobServiceClient(this.getStorageUrl(this._storageAccount), sharedKeyCredential);

this._containerClient = this._blobServiceClient.getContainerClient(this._containerName);

this._addPrefixToDownloadedItems = !!addPrefixToDownloadedItems;
}

public putArtifactItem(item: models.ArtifactItem, readStream: NodeJS.ReadableStream): Promise<models.ArtifactItem> {
return new Promise(async (resolve, reject) => {
await this._ensureContainerExistence();

var self = this;
console.log(tl.loc("UploadingItem", item.path));
var blobPath = this._prefixFolderPath ? this._prefixFolderPath + "/" + item.path : item.path;

var writeStream = this._blobSvc.createWriteStreamToBlockBlob(this._container, blobPath, null, function (error, result, response) {
if (error) {
console.log(tl.loc("FailedToUploadBlob", blobPath, error.message));
reject(error);
} else {
var blobUrl = self._blobSvc.getUrl(self._container, blobPath);
console.log(tl.loc("CreatedBlobForItem", item.path, blobUrl));
item.metadata["destinationUrl"] = blobUrl;
resolve(item);
}
});
public async putArtifactItem(item: models.ArtifactItem, readStream: Readable): Promise<models.ArtifactItem> {
await this._containerClient.createIfNotExists();

readStream.pipe(writeStream);
writeStream.on("error",
(error) => {
console.log("ErrorInWriteStream", error);
reject(error);
});
readStream.on("error",
(error) => {
console.log(tl.loc("ErrorInReadStream", error));
reject(error);
});
});
const blobPath = this._prefixFolderPath ? this._prefixFolderPath + item.path : item.path;
console.log(tl.loc("UploadingItem", blobPath));

const blockBlobClient = this._containerClient.getBlockBlobClient(blobPath);

try {
const bufferSize = 8 * 1024 * 1024;
const maxConcurrency = 20;
const timeoutInMs = 30 * 60 * 1000;

await blockBlobClient.uploadStream(readStream, bufferSize, maxConcurrency, {
abortSignal: abortController.AbortController.timeout(timeoutInMs),
});

const blobUrl = blockBlobClient.url;
console.log(tl.loc("CreatedBlobForItem", item.path, blobUrl));
item.metadata["destinationUrl"] = blobUrl;

return item;
} catch(error) {
console.log(tl.loc("ErrorInWriteStream", error instanceof Error
? error.message
: "Error in write stream"));

throw error;
}
}

public getRootItems(): Promise<models.ArtifactItem[]> {
return this._getItems(this._container, this._prefixFolderPath);
return this._getItems(this._prefixFolderPath);
}

public getArtifactItems(artifactItem: models.ArtifactItem): Promise<models.ArtifactItem[]> {
throw new Error(tl.loc("GetArtifactItemsNotSupported"));
}

public getArtifactItem(artifactItem: models.ArtifactItem): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) => {
var readStream: NodeJS.ReadableStream;
if (!this._addPrefixToDownloadedItems && !!this._prefixFolderPath) {
// Adding prefix path to get the absolute path
readStream = this._blobSvc.createReadStream(this._container, this._prefixFolderPath + artifactItem.path, null);
} else {
readStream = this._blobSvc.createReadStream(this._container, artifactItem.path, null);
}
public async getArtifactItem(artifactItem: models.ArtifactItem): Promise<NodeJS.ReadableStream> {
let blobPath = artifactItem.path;

if (!this._addPrefixToDownloadedItems && !!this._prefixFolderPath) {
blobPath = this._prefixFolderPath + artifactItem.path;
}

const blockBlobClient = this._containerClient.getBlockBlobClient(blobPath);

const timeoutInMs = 30 * 60 * 1000;
const offset = 0;
const count = undefined; // download to the end

try {
let downloadResponse = await blockBlobClient.download(offset, count, {
abortSignal: abortController.AbortController.timeout(timeoutInMs),
maxRetryRequests: 10
});

// Replace full path by filename in order to save the file directly to destination folder
artifactItem.path = path.basename(artifactItem.path);
resolve(readStream);
});

return downloadResponse.readableStreamBody;

} catch (error) {
console.log(tl.loc("ErrorInReadStream", error instanceof Error
? error.message
: "Error in read stream"));

throw error;
}
}

public dispose() {
}

private _ensureContainerExistence(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!this._isContainerExists) {
var self = this;
this._blobSvc.createContainerIfNotExists(this._container, function (error, result, response) {
if (!!error) {
console.log(tl.loc("FailedToCreateContainer", self._container, error.message));
reject(error);
} else {
self._isContainerExists = true;
console.log(tl.loc("CreatedContainer", self._container));
resolve();
}
});
} else {
resolve();
}
});
private async _getItems(parentRelativePath?: string): Promise<models.ArtifactItem[]> {
const result = await this._getListOfItemsInsideContainer(parentRelativePath);
const items = this._convertBlobResultToArtifactItem(result);
console.log(tl.loc("SuccessFullyFetchedItemList"));

return items;
}

private _getItems(container: string, parentRelativePath?: string): Promise<models.ArtifactItem[]> {
var promise = new Promise<models.ArtifactItem[]>(async (resolve, reject) => {
var items: models.ArtifactItem[] = [];
var continuationToken = null;
var result;
do {
result = await this._getListOfItemsInsideContainer(container, parentRelativePath, continuationToken);
items = items.concat(this._convertBlobResultToArtifactItem(result.entries));
continuationToken = result.continuationToken;
if (!!continuationToken) {
console.log(tl.loc("ContinuationTokenExistsFetchingRemainingFiles"));
}
} while (continuationToken);

console.log(tl.loc("SuccessFullyFetchedItemList"));
resolve(items);
});
private async _getListOfItemsInsideContainer(parentRelativePath: string): Promise<BlobItem[]> {
const listBlobsOptions = { prefix: parentRelativePath };
const pagingOptions = { maxPageSize: 100 };
const blobItems: BlobItem[] = [];

return promise;
for await (const page of this._containerClient.listBlobsFlat(listBlobsOptions).byPage(pagingOptions)) {
page.segment.blobItems.forEach((blobItem: BlobItem) => {
blobItems.push(blobItem);
});
}

return blobItems;
}

private async _getListOfItemsInsideContainer(container, parentRelativePath, continuationToken): Promise<azureStorage.BlobService.ListBlobsResult> {
var promise = new Promise<azureStorage.BlobService.ListBlobsResult>((resolve, reject) => {
this._blobSvc.listBlobsSegmentedWithPrefix(container, parentRelativePath, continuationToken, async (error, result) => {
if (!!error) {
console.log(tl.loc("FailedToListItemInsideContainer", container, error.message));
reject(error);
} else {
resolve(result);
}
});
});
private _convertBlobResultToArtifactItem(blobItems: BlobItem[]): models.ArtifactItem[] {
const artifactItems: models.ArtifactItem[] = [];

return promise;
}
blobItems.forEach(item => {
const artifactItem: models.ArtifactItem = new models.ArtifactItem();

artifactItem.itemType = models.ItemType.File;
artifactItem.fileLength = item.properties.contentLength;
artifactItem.lastModified = new Date(item.properties.lastModified + 'Z');

private _convertBlobResultToArtifactItem(blobResult: azureStorage.BlobService.BlobResult[]): models.ArtifactItem[] {
var artifactItems: models.ArtifactItem[] = new Array<models.ArtifactItem>();
blobResult.forEach(element => {
var artifactitem: models.ArtifactItem = new models.ArtifactItem();
artifactitem.itemType = models.ItemType.File;
artifactitem.fileLength = parseInt(element.contentLength);
artifactitem.lastModified = new Date(element.lastModified + 'Z');
if (!this._addPrefixToDownloadedItems && !!this._prefixFolderPath) {
// Supplying relative path without prefix; removing the first occurence
artifactitem.path = element.name.replace(this._prefixFolderPath, "").trim();
artifactItem.path = item.name.replace(this._prefixFolderPath, "").trim();
} else {
artifactitem.path = element.name;
artifactItem.path = item.name;
}
artifactItems.push(artifactitem);
artifactItems.push(artifactItem);
});

return artifactItems;
}
private _storageAccount: string;
private _accessKey: string;
private _container: string;
private _prefixFolderPath: string;
private _isContainerExists: boolean = false;
private _blobSvc: azureStorage.BlobService;
private _addPrefixToDownloadedItems: boolean = false;

private getStorageUrl(storageAccount: string): string {
return `https://${storageAccount}.blob.core.windows.net`;
}
}
2 changes: 0 additions & 2 deletions common-npm-packages/az-blobstorage-provider/blobservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import artifactProviders = require('artifact-engine/Providers');
import azureBlobProvider = require('./azureBlobStorageProvider');
import artifactProcessor = require('artifact-engine/Engine');
import models = require('artifact-engine/Models');
import path = require('path');
import util = require('util');

export class BlobService {
private _storageAccountName: string;
Expand Down
4 changes: 1 addition & 3 deletions common-npm-packages/az-blobstorage-provider/manualTest.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import path = require("path");
import util = require('util');
import * as BlobService from './blobservice';

var config = require("./config.json");
Expand All @@ -8,7 +6,7 @@ export class Demo {

async main() {
var blobService = new BlobService.BlobService(config.azureblobstorage.storageAccountName, config.azureblobstorage.storageAccessKey);
blobService.downloadBlobs(config.dropLocation, config.azureblobstorage.sourceContainerName);
await blobService.downloadBlobs(config.dropLocation, config.azureblobstorage.sourceContainerName, "v17");
var uploadedUrls = await blobService.uploadBlobs(config.dropLocation, config.azureblobstorage.destinationContainerName, "ManualTest/uploadHere")
console.log("####### Uploaded urls start ########")
uploadedUrls.forEach((url: string) => {
Expand Down
Loading