diff --git a/build.sbt b/build.sbt index 9c059f3ea81..3b255d0e0e5 100644 --- a/build.sbt +++ b/build.sbt @@ -91,6 +91,12 @@ lazy val cloudSupport = project .dependsOn(common) .dependsOn(common % "test->test") +lazy val azureBlobFileSystem = (project in file("filesystems/blob")) + .withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies) + .dependsOn(core) + .dependsOn(core % "test->test") + .dependsOn(common % "test->test") + lazy val awsS3FileSystem = (project in file("filesystems/s3")) .withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies) .dependsOn(core) @@ -249,10 +255,12 @@ lazy val engine = project .dependsOn(drsFileSystem) .dependsOn(sraFileSystem) .dependsOn(awsS3FileSystem) + .dependsOn(azureBlobFileSystem) .dependsOn(awsS3FileSystem % "test->test") .dependsOn(drsFileSystem % "test->test") .dependsOn(httpFileSystem % "test->test") .dependsOn(ftpFileSystem % "test->test") + .dependsOn(azureBlobFileSystem % "test->test") .dependsOn(`cloud-nio-spi`) .dependsOn(languageFactoryCore) .dependsOn(cwlV1_0LanguageFactory % "test->test") @@ -391,6 +399,7 @@ lazy val root = (project in file(".")) .aggregate(`cromwell-drs-localizer`) .aggregate(awsBackend) .aggregate(awsS3FileSystem) + .aggregate(azureBlobFileSystem) .aggregate(backend) .aggregate(centaur) .aggregate(centaurCwlRunner) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala new file mode 100644 index 00000000000..038936ec46e --- /dev/null +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -0,0 +1,89 @@ +package cromwell.filesystems.blob + +import com.azure.core.credential.AzureSasCredential +import com.azure.storage.blob.nio.AzureFileSystem +import com.google.common.net.UrlEscapers +import cromwell.core.path.NioPath +import cromwell.core.path.Path +import cromwell.core.path.PathBuilder +import cromwell.filesystems.blob.BlobPathBuilder._ + +import java.net.MalformedURLException +import java.net.URI +import java.nio.file.FileSystems +import scala.jdk.CollectionConverters._ +import scala.language.postfixOps +import scala.util.Failure +import scala.util.Try + +object BlobPathBuilder { + + sealed trait BlobPathValidation + case class ValidBlobPath(path: String) extends BlobPathValidation + case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation + + def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" + def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) + def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption + + /** + * Validates a that a path from a string is a valid BlobPath of the format: + * {endpoint}/{containerName}/{pathToFile} + * + * with an endpoint for a particular storage account typically given by: + * https://{storageAccountName}.blob.core.windows.net/ + * + * For example, a path string we might expect to receive might look like: + * https://appexternalstorage.blob.core.windows.net/inputs/test/testFile.wdl + * + * In this example + * storageAccountName -> appexternalstorage + * endpoint -> https://{storageAccountName}.blob.core.windows.net/ + * container -> inputs + * pathToFile -> test/testFile.wdl + * + * If the configured container and storage account do not match, the string is considered unparsable + */ + def validateBlobPath(string: String, container: String, endpoint: String): BlobPathValidation = { + Try { + val uri = parseURI(string) + val storageAccount = parseStorageAccount(parseURI(endpoint)) + val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container) + def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get) + if (hasContainer && !storageAccount.isEmpty && hasEndpoint) { + ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) + } else { + UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) + } + } recover { case t => UnparsableBlobPath(t) } get + } +} + +class BlobPathBuilder(credential: AzureSasCredential, container: String, endpoint: String) extends PathBuilder { + + val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container)) + + def build(string: String): Try[BlobPath] = { + validateBlobPath(string, container, endpoint) match { + case ValidBlobPath(path) => + Try { + val fileSystem = FileSystems.newFileSystem(new URI("azb://?endpoint=" + endpoint), fileSystemConfig.asJava) + val blobStoragePath = fileSystem.getPath(path) + BlobPath(blobStoragePath, endpoint, container) + } + case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage) + } + } + + override def name: String = "Azure Blob Storage" +} + +// Add args for container, storage account name +case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path { + override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container) + + override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") + + override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() +} diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala new file mode 100644 index 00000000000..ca5e24fe3d7 --- /dev/null +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -0,0 +1,22 @@ +package cromwell.filesystems.blob + +import akka.actor.ActorSystem +import com.azure.core.credential.AzureSasCredential +import com.typesafe.config.Config +import cromwell.core.WorkflowOptions +import cromwell.core.path.PathBuilderFactory +import cromwell.filesystems.blob.BlobPathBuilder + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config) extends PathBuilderFactory { + override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { + val sasToken: String = instanceConfig.getString("sasToken") + val container: String = instanceConfig.getString("store") + val endpoint: String = instanceConfig.getString("endpoint") + Future { + new BlobPathBuilder(new AzureSasCredential(sasToken), container, endpoint) + } + } +} diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala new file mode 100644 index 00000000000..454b8f5adaf --- /dev/null +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -0,0 +1,65 @@ +package cromwell.filesystems.blob + +import com.azure.core.credential.AzureSasCredential +import cromwell.filesystems.blob.BlobPathBuilder +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.nio.file.Files + +object BlobPathBuilderSpec { + def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net" +} + +class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ + + it should "parse a URI into a path" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val container = "container" + val evalPath = "/path/to/file" + val testString = endpoint + "/" + container + evalPath + BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { + case BlobPathBuilder.ValidBlobPath(path) => path should equal(evalPath) + case BlobPathBuilder.UnparsableBlobPath(errorMessage) => fail(errorMessage) + } + } + + it should "bad storage account fails causes URI to fail parse into a path" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val container = "container" + val evalPath = "/path/to/file" + val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath + BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { + case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account") + case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) + } + } + + it should "bad container fails causes URI to fail parse into a path" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val container = "container" + val evalPath = "/path/to/file" + val testString = endpoint + "badContainer" + evalPath + BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { + case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container") + case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) + } + } + + ignore should "build a blob path from a test string and read a file" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost + val store = "inputs" + val evalPath = "/test/inputFile.txt" + val sas = "{SAS TOKEN HERE}" + val testString = endpoint + "/" + store + evalPath + val blobPath: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, endpoint) build testString getOrElse fail() + blobPath.container should equal(store) + blobPath.endpoint should equal(endpoint) + blobPath.pathAsString should equal(testString) + blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath) + val is = Files.newInputStream(blobPath.nioPath) + val fileText = (is.readAllBytes.map(_.toChar)).mkString + fileText should include ("This is my test file!!!! Did it work?") + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fd0bb021bd9..bdbf1185e66 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,6 +10,7 @@ object Dependencies { // We would like to use the BOM to manage Azure SDK versions, but SBT doesn't support it. // https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/boms/azure-sdk-bom // https://github.com/sbt/sbt/issues/4531 + private val azureStorageBlobNioV = "12.0.0-beta.18" private val azureIdentitySdkV = "1.4.2" private val azureKeyVaultSdkV = "4.3.7" private val betterFilesV = "3.9.1" @@ -183,6 +184,9 @@ object Dependencies { ) val azureDependencies: List[ModuleID] = List( + "com.azure" % "azure-storage-blob-nio" % azureStorageBlobNioV + exclude("jakarta.xml.bind", "jakarta.xml.bind-api") + exclude("jakarta.activation", "jakarta.activation-api"), "com.azure" % "azure-identity" % azureIdentitySdkV exclude("jakarta.xml.bind", "jakarta.xml.bind-api") exclude("jakarta.activation", "jakarta.activation-api"), @@ -395,6 +399,8 @@ object Dependencies { List("scalatest", "mysql", "mariadb", "postgresql") .map(name => "com.dimafeng" %% s"testcontainers-scala-$name" % testContainersScalaV % Test) + val blobFileSystemDependencies: List[ModuleID] = azureDependencies + val s3FileSystemDependencies: List[ModuleID] = junitDependencies val gcsFileSystemDependencies: List[ModuleID] = akkaHttpDependencies