Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BT-684 Initial Blob Storage Impl #6810

Merged
merged 8 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ lazy val cloudSupport = project
.dependsOn(common)
.dependsOn(common % "test->test")

lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
.dependsOn(core)
Expand Down Expand Up @@ -249,10 +255,12 @@ lazy val engine = project
.dependsOn(drsFileSystem)
.dependsOn(sraFileSystem)
.dependsOn(awsS3FileSystem)
.dependsOn(azureBlobFileSystem)
.dependsOn(awsS3FileSystem % "test->test")
.dependsOn(drsFileSystem % "test->test")
.dependsOn(httpFileSystem % "test->test")
.dependsOn(ftpFileSystem % "test->test")
.dependsOn(azureBlobFileSystem % "test->test")
.dependsOn(`cloud-nio-spi`)
.dependsOn(languageFactoryCore)
.dependsOn(cwlV1_0LanguageFactory % "test->test")
Expand Down Expand Up @@ -391,6 +399,7 @@ lazy val root = (project in file("."))
.aggregate(`cromwell-drs-localizer`)
.aggregate(awsBackend)
.aggregate(awsS3FileSystem)
.aggregate(azureBlobFileSystem)
.aggregate(backend)
.aggregate(centaur)
.aggregate(centaurCwlRunner)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.storage.blob.nio.AzureFileSystem
import com.google.common.net.UrlEscapers
import cromwell.core.path.NioPath
import cromwell.core.path.Path
import cromwell.core.path.PathBuilder
import cromwell.filesystems.blob.BlobPathBuilder._
kraefrei marked this conversation as resolved.
Show resolved Hide resolved

import java.net.MalformedURLException
import java.net.URI
import java.nio.file.FileSystems
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Try

object BlobPathBuilder {

sealed trait BlobPathValidation
case class ValidBlobPath(path: String) extends BlobPathValidation
case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation

}

kraefrei marked this conversation as resolved.
Show resolved Hide resolved
class BlobPathBuilder(credential: AzureSasCredential, container: String, storageAccount: String, dnsZone: String = "") extends PathBuilder {

val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container))

/**
* Validates a that a path from a string is a valid BlobPath of the format:
* {endpoint}/{containerName}/{pathToFile}
*
* with an endpoint for a particular storage account given by:
* https://{storageAccountName}.blob.core.windows.net/
* OR
* https://{storageAccountName}.{dnsZone}.blob.core.windows.net/
*
* For example, a path string we might expect to receive might look like:
* https://appexternalstorage.blob.core.windows.net/inputs/test/testFile.wdl
*
* In this example
* storageAccountName -> appexternalstorage
* endpoint -> https://{storageAccountName}.{DnsZone}.blob.core.windows.net/
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
* container -> inputs
* pathToFile -> test/testFile.wdl
*
* If the configured container and storage account do not match, the string is considered unparsable
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice explanation.

def validateBlobPath(string: String): BlobPathValidation = {
Try {
val uri = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
if ((uri.getPath().split("/").filter(!_.isEmpty()).head.equals(container))
&& (uri.getHost().split("\\.").filter(!_.isEmpty()).head.equals(storageAccount))) {
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
ValidBlobPath(uri.getPath.replaceFirst("/" + container, ""))
} else {
throw new MalformedURLException("Malformed Blob URL for this builder. Expecting a URL for a container"
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
+ container + " and storage account: " + storageAccount)
}
} recover { case t => UnparsableBlobPath(t) } get
}

def endpointString = {
val dnsZoneDot = if (dnsZone.isEmpty()) "" else "."
"https://" + storageAccount + dnsZoneDot + dnsZone + ".blob.core.windows.net"
}

def build(string: String): Try[BlobPath] = {
validateBlobPath(string) match {
case ValidBlobPath(path) =>
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
Try {
val fileSystem = FileSystems.newFileSystem(new URI("azb://?endpoint=" + endpointString), fileSystemConfig.asJava)
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
val blobStoragePath = fileSystem.getPath(path)
BlobPath(blobStoragePath, endpointString, container)
}
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage)
}
}

override def name: String = "Azure Blob Storage"
}

// Add args for container, storage account name
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path {
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container)

override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")

override def pathWithoutScheme: String = "/" + container + "/" + nioPath.toString()
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cromwell.filesystems.blob

import akka.actor.ActorSystem
import com.azure.core.credential.AzureSasCredential
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import cromwell.filesystems.blob.BlobPathBuilder

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config) extends PathBuilderFactory {
override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
val sasToken: String = instanceConfig.getString("sasToken")
val container: String = instanceConfig.getString("store")
val storageAccount: String = instanceConfig.getString("storageAccount")
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
val dnsZone: String = instanceConfig.getString("dnsZone")
Future {
new BlobPathBuilder(new AzureSasCredential(sasToken), container, storageAccount, dnsZone)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import cromwell.filesystems.blob.BlobPathBuilder
import org.scalatest.Ignore
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.file.Files

@Ignore
class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{

kraefrei marked this conversation as resolved.
Show resolved Hide resolved
it should "parse a URI into a path" in {
val testString = "https://storageAccount.blob.core.windows.net/container/path/to/file"
val evalPath = "/path/to/file"
val pathBuilder = new BlobPathBuilder(new AzureSasCredential("test"), "container", "storageAccount")
pathBuilder.validateBlobPath(testString) match {
case BlobPathBuilder.ValidBlobPath(path) => path should equal(evalPath)
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => fail(errorMessage)
}
kraefrei marked this conversation as resolved.
Show resolved Hide resolved
}

it should "build a blob path from a test string and read a file" in {
val storageAccount = "coaexternalstorage"
val store = "inputs"
val evalPath = "/test/inputFile.txt"
val sas = "{SAS TOKEN HERE}"
val endpoint = "https://" + storageAccount + ".blob.core.windows.net"
val testString = endpoint + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, storageAccount) build testString getOrElse fail()
blobPath.container should equal(store)
blobPath.endpoint should equal(endpoint)
blobPath.pathAsString should equal(testString)
blobPath.pathWithoutScheme should equal("/" + store + evalPath)
val is = Files.newInputStream(blobPath.nioPath)
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}
}
6 changes: 6 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ object Dependencies {
// We would like to use the BOM to manage Azure SDK versions, but SBT doesn't support it.
// https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/boms/azure-sdk-bom
// https://github.com/sbt/sbt/issues/4531
private val azureStorageBlobNioV = "12.0.0-beta.18"
private val azureIdentitySdkV = "1.4.2"
private val azureKeyVaultSdkV = "4.3.7"
private val betterFilesV = "3.9.1"
Expand Down Expand Up @@ -183,6 +184,9 @@ object Dependencies {
)

val azureDependencies: List[ModuleID] = List(
"com.azure" % "azure-storage-blob-nio" % azureStorageBlobNioV
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
"com.azure" % "azure-identity" % azureIdentitySdkV
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
Expand Down Expand Up @@ -395,6 +399,8 @@ object Dependencies {
List("scalatest", "mysql", "mariadb", "postgresql")
.map(name => "com.dimafeng" %% s"testcontainers-scala-$name" % testContainersScalaV % Test)

val blobFileSystemDependencies: List[ModuleID] = azureDependencies

val s3FileSystemDependencies: List[ModuleID] = junitDependencies

val gcsFileSystemDependencies: List[ModuleID] = akkaHttpDependencies
Expand Down