Skip to content

Commit

Permalink
WX-696 Enable getting SAS token from WSM (#6954)
Browse files Browse the repository at this point in the history
* WX-696 Enable getting SAS token from WSM

* Wire container resource id from config

* Move resource-container-id config path

* First pass at config for WSM

* Remove unused singleton config

* Tests for new config

* Fix config parsing

* Modified b2c token to be provided each time

* Remove singletonConfig arg from factory

* Restore types to factory configs

* Clean up comments and empty token default

* Default to config b2c before searching environment

* Fix token default on api client

* Fix test

* Refactor error handling for when there is no token

* Remove token constructor arg for clientProvider

* Move configs to global singleton config

* Update filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

* default -> override

* Add override token to test

* Update filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

Co-authored-by: Adam Nichols <anichols@broadinstitute.org>

* Parentheses

* Reduce token timeout

* Move AzureCredentials to separate file

* Make AzureCredentials an object

* WSM token cleanup

* Config refactor (#6960)

Co-authored-by: Janet Gainer-Dewar <jdewar@broadinstitute.org>

* Initial blob token documentation

* Refine language in BlobSasTokenGenerator

* Update comment and formatting

Co-authored-by: Janet Gainer-Dewar <jdewar@broadinstitute.org>
Co-authored-by: Adam Nichols <anichols@broadinstitute.org>
  • Loading branch information
3 people authored Dec 1, 2022
1 parent 2be0787 commit 46a7918
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ case object GoogleAppDefaultTokenStrategy extends DrsCredentials {
*/
case class AzureDrsCredentials(identityClientId: Option[String]) extends DrsCredentials {

final val tokenAcquisitionTimeout = 30.seconds
final val tokenAcquisitionTimeout = 5.seconds

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
val tokenScope = "https://management.azure.com/.default"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cromwell.filesystems.blob

import cats.implicits.catsSyntaxValidatedId
import com.azure.core.credential.TokenRequestContext
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import common.validation.ErrorOr.ErrorOr

import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import scala.util.{Failure, Success, Try}

/**
* Strategy for obtaining an access token in an environment with available Azure identity.
* If you need to disambiguate among multiple active user-assigned managed identities, pass
* in the client id of the identity that should be used.
*/
case object AzureCredentials {

final val tokenAcquisitionTimeout = 5.seconds

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
val tokenScope = "https://management.azure.com/.default"

private def tokenRequestContext: TokenRequestContext = {
val trc = new TokenRequestContext()
trc.addScopes(tokenScope)
trc
}

private def defaultCredentialBuilder: DefaultAzureCredentialBuilder =
new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)

def getAccessToken(identityClientId: Option[String]): ErrorOr[String] = {
val credentials = identityClientId.foldLeft(defaultCredentialBuilder) {
(builder, clientId) => builder.managedIdentityClientId(clientId)
}.build()

Try(
credentials
.getToken(tokenRequestContext)
.block(tokenAcquisitionTimeout.toJava)
) match {
case Success(null) => "null token value attempting to obtain access token".invalidNel
case Success(token) => token.getToken.validNel
case Failure(error) => s"Failed to refresh access token: ${error.getMessage}".invalidNel
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package cromwell.filesystems.blob

import cats.implicits.catsSyntaxValidatedId
import cats.syntax.apply._
import com.typesafe.config.Config
import common.validation.Validation._
import net.ceedubs.ficus.Ficus._

import java.util.UUID

// WSM config is needed for accessing WSM-managed blob containers created in Terra workspaces.
// If the identity executing Cromwell has native access to the blob container, this can be ignored.
final case class WorkspaceManagerConfig(url: WorkspaceManagerURL,
workspaceId: WorkspaceId,
containerResourceId: ContainerResourceId,
overrideWsmAuthToken: Option[String]) // dev-only

final case class BlobFileSystemConfig(endpointURL: EndpointURL,
blobContainerName: BlobContainerName,
subscriptionId: Option[SubscriptionId],
expiryBufferMinutes: Long,
workspaceManagerConfig: Option[WorkspaceManagerConfig])

object BlobFileSystemConfig {

final val defaultExpiryBufferMinutes = 10L

def apply(config: Config): BlobFileSystemConfig = {
val endpointURL = parseString(config, "endpoint").map(EndpointURL)
val blobContainer = parseString(config, "container").map(BlobContainerName)
val subscriptionId = parseUUIDOpt(config, "subscription").map(_.map(SubscriptionId))
val expiryBufferMinutes =
parseLongOpt(config, "expiry-buffer-minutes")
.map(_.getOrElse(defaultExpiryBufferMinutes))

val wsmConfig =
if (config.hasPath("workspace-manager")) {
val wsmConf = config.getConfig("workspace-manager")
val wsmURL = parseString(wsmConf, "url").map(WorkspaceManagerURL)
val workspaceId = parseUUID(wsmConf, "workspace-id").map(WorkspaceId)
val containerResourceId = parseUUID(wsmConf, "container-resource-id").map(ContainerResourceId)
val overrideWsmAuthToken = parseStringOpt(wsmConf, "b2cToken")

(wsmURL, workspaceId, containerResourceId, overrideWsmAuthToken)
.mapN(WorkspaceManagerConfig)
.map(Option(_))
}
else None.validNel

(endpointURL, blobContainer, subscriptionId, expiryBufferMinutes, wsmConfig)
.mapN(BlobFileSystemConfig.apply)
.unsafe("Couldn't parse blob filesystem config")
}

private def parseString(config: Config, path: String) =
validate[String] { config.as[String](path) }

private def parseStringOpt(config: Config, path: String) =
validate[Option[String]] { config.as[Option[String]](path) }

private def parseUUID(config: Config, path: String) =
validate[UUID] { UUID.fromString(config.as[String](path)) }

private def parseUUIDOpt(config: Config, path: String) =
validate[Option[UUID]] { config.as[Option[String]](path).map(UUID.fromString) }

private def parseLongOpt(config: Config, path: String) =
validate[Option[Long]] { config.as[Option[Long]](path) }
}

// Our filesystem setup magic can't use BlobFileSystemConfig.apply directly, so we need this
// wrapper class.
class BlobFileSystemConfigWrapper(val config: BlobFileSystemConfig) {
def this(config: Config) = this(BlobFileSystemConfig(config))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,32 @@ import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.scalalogging.LazyLogging
import common.validation.Validation._

import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant, OffsetDateTime}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.typesafe.scalalogging.LazyLogging

import java.util.UUID

case class FileSystemAPI() {
def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava)
def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close)
}

/**
* The BlobFileSystemManager is an object that is responsible for managing the open filesystem,
* and refreshing the SAS token that is used to access the blob container containing that filesystem.
*
* See BlobSasTokenGenerator for more information on how a SAS token is generated
*/
object BlobFileSystemManager {
def parseTokenExpiry(token: AzureSasCredential): Option[Instant] = for {
expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":"))
Expand All @@ -45,7 +49,7 @@ case class BlobFileSystemManager(
container: BlobContainerName,
endpoint: EndpointURL,
expiryBufferMinutes: Long,
blobTokenGenerator: BlobTokenGenerator,
blobTokenGenerator: BlobSasTokenGenerator,
fileSystemAPI: FileSystemAPI = FileSystemAPI(),
private val initialExpiration: Option[Instant] = None) extends LazyLogging {
private var expiry: Option[Instant] = initialExpiration
Expand All @@ -62,13 +66,13 @@ case class BlobFileSystemManager(
// If no filesystem already exists, this will create a new connection, with the provided configs
case _: FileSystemNotFoundException =>
logger.info(s"Creating new blob filesystem for URI $uri")
blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
blobTokenGenerator.generateBlobSasToken.flatMap(generateFilesystem(uri, container, _))
}
// If the token has expired, OR there is no token record, try to close the FS and regenerate
case true =>
logger.info(s"Closing & regenerating token for existing blob filesystem at URI $uri")
fileSystemAPI.closeFileSystem(uri)
blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _))
blobTokenGenerator.generateBlobSasToken.flatMap(generateFilesystem(uri, container, _))
}
}
}
Expand All @@ -81,44 +85,96 @@ case class BlobFileSystemManager(

}

sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]}
object BlobTokenGenerator {
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId]): BlobTokenGenerator = {
NativeBlobTokenGenerator(container, endpoint, subscription)
sealed trait BlobSasTokenGenerator { def generateBlobSasToken: Try[AzureSasCredential] }
object BlobSasTokenGenerator {
/**
* Native SAS token generator, uses the DefaultAzureCredentialBuilder in the local environment
* to produce a SAS token.
*
* @param container The BlobContainerName of the blob container to be accessed by the generated SAS token
* @param endpoint The EndpointURL containing the storage account of the blob container to be accessed by
* this SAS token
* @param subscription Optional subscription parameter to use for local authorization.
* If one is not provided the default subscription is used
* @return A NativeBlobTokenGenerator, able to produce a valid SAS token for accessing the provided blob
* container and endpoint locally
*/
def createBlobTokenGenerator(container: BlobContainerName,
endpoint: EndpointURL,
subscription: Option[SubscriptionId]): BlobSasTokenGenerator = {
NativeBlobSasTokenGenerator(container, endpoint, subscription)
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerClient: WorkspaceManagerApiClientProvider): BlobTokenGenerator = {
WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerClient)

/**
* WSM-mediated SAS token generator, uses the DefaultAzureCredentialBuilder in the cloud environment
* to request a SAS token from the WSM to access the given blob container. If an overrideWsmAuthToken
* is provided this is used instead.
*
* @param container The BlobContainerName of the blob container to be accessed by the generated SAS token
* @param endpoint The EndpointURL containing the storage account of the blob container to be accessed by
* this SAS token
* @param workspaceId The WorkspaceId of the account to authenticate against
* @param containerResourceId The ContainterResourceId of the blob container as WSM knows it
* @param workspaceManagerClient The client for making requests against WSM
* @param overrideWsmAuthToken An optional WsmAuthToken used for authenticating against the WSM for a valid
* SAS token to access the given container and endpoint. This is a dev only option that is only intended
* for local testing of the WSM interface
* @return A WSMBlobTokenGenerator, able to produce a valid SAS token for accessing the provided blob
* container and endpoint that is managed by WSM
*/
def createBlobTokenGenerator(container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: WorkspaceId,
containerResourceId: ContainerResourceId,
workspaceManagerClient: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]): BlobSasTokenGenerator = {
WSMBlobSasTokenGenerator(container, endpoint, workspaceId, containerResourceId, workspaceManagerClient, overrideWsmAuthToken)
}

}

case class WSMBlobTokenGenerator(
container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: WorkspaceId,
wsmClient: WorkspaceManagerApiClientProvider) extends BlobTokenGenerator {

def generateAccessToken: Try[AzureSasCredential] = Try {
val token = wsmClient.getControlledAzureResourceApi.createAzureStorageContainerSasToken(
UUID.fromString(workspaceId.value),
UUID.fromString("00001111-2222-3333-aaaa-bbbbccccdddd"),
null,
null,
null,
null
).getToken // TODO `null` items may be required, investigate in WX-696

new AzureSasCredential(token) // TODO Does `signature` actually mean token? save for WX-696
case class WSMBlobSasTokenGenerator(container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: WorkspaceId,
containerResourceId: ContainerResourceId,
wsmClientProvider: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]) extends BlobSasTokenGenerator {

/**
* Generate a BlobSasToken by using the available authorization information
* If an overrideWsmAuthToken is provided, use this in the wsmClient request
* Else try to use the environment azure identity to request the SAS token
*
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken: Try[AzureSasCredential] = {
val wsmAuthToken: Try[String] = overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry
}

for {
wsmAuth <- wsmAuthToken
wsmClient = wsmClientProvider.getControlledAzureResourceApi(wsmAuth)
sasToken <- Try( // Java library throws
wsmClient.createAzureStorageContainerSasToken(
workspaceId.value,
containerResourceId.value,
null,
null,
null,
null
).getToken)
} yield new AzureSasCredential(sasToken)
}
}

case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobTokenGenerator {

case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobSasTokenGenerator {
private val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
private def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build
private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.value)
private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.toString)
private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()
private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)

Expand All @@ -137,8 +193,13 @@ case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: Endp
.setListPermission(true)
.setWritePermission(true)


def generateAccessToken: Try[AzureSasCredential] = for {
/**
* Generate a BlobSasToken by using the local environment azure identity
* This will use a default subscription if one is not provided.
*
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken: Try[AzureSasCredential] = for {
uri <- BlobPathBuilder.parseURI(endpoint.value)
configuredAccount <- BlobPathBuilder.parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
Expand Down
Loading

0 comments on commit 46a7918

Please sign in to comment.