From 432fedbfec7629bbbaa7d1158a23de5e5eb84733 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Tue, 12 Mar 2024 15:34:48 -0400 Subject: [PATCH] [query] Use RouterFS With LocalBackend A long-standing fixme in the LocalBackend was to not rely on HadoopFS, which we use with the SparkBackend for compatibility with dataproc and hdfs urls. By default, the HadoopFS doesn't understand gs urls. Users need to install the gcs-hadoop-connector (preinstalled in dataproc) to communicate with google cloud storage. Spark handles supplying credentials to the connector. Issue #13904 is caused by failing to properly supply the gcs-hadoop-connector with credentials in the LocalBackend. In the absence of config, the connector hangs while trying to fetch a token form a non-existant metadata server. The LocalBackend was designed to be a testing ground for lowered and compiled code that would eventually be run on batch, where we use the RouterFS. I propose a pragmatic fix for #13904 that ditches the HadoopFS for all but local filesystem access in the LocalBackend instead of identifying and fixing the root cause. In doing so, I made a couple of changes to how the RouterFS is configured: In the absence of the `HAIL_CLOUD` environment variable, RouterFS can handle gs and az urls iff credentials are not supplied. If the user supplies creditials, we use `HAIL_CLOUD` to decide which cloud to route to. fixes #13904 --- hail/python/hail/backend/local_backend.py | 7 ++- .../main/scala/is/hail/HailFeatureFlags.scala | 3 + .../is/hail/backend/local/LocalBackend.scala | 48 ++++----------- .../hail/backend/service/ServiceBackend.scala | 6 +- .../is/hail/backend/service/Worker.scala | 4 +- hail/src/main/scala/is/hail/io/fs/FS.scala | 58 ++++++++++--------- .../scala/is/hail/io/fs/GoogleStorageFS.scala | 23 +++----- 7 files changed, 62 insertions(+), 87 deletions(-) diff --git a/hail/python/hail/backend/local_backend.py b/hail/python/hail/backend/local_backend.py index a85e382c31fe..a63c98f78093 100644 --- a/hail/python/hail/backend/local_backend.py +++ b/hail/python/hail/backend/local_backend.py @@ -67,8 +67,6 @@ def __init__( jbackend = hail_package.backend.local.LocalBackend.apply( tmpdir, - gcs_requester_pays_project, - gcs_requester_pays_buckets, log, True, append, @@ -81,7 +79,10 @@ def __init__( self._fs = self._exit_stack.enter_context(RouterFS()) self._logger = None - self._initialize_flags({}) + self._initialize_flags({ + 'gcs_requester_pays_project': gcs_requester_pays_project, + 'gcs_requester_pays_buckets': gcs_requester_pays_buckets + }) def validate_file(self, uri: str) -> None: validate_file(uri, self._fs.afs) diff --git a/hail/src/main/scala/is/hail/HailFeatureFlags.scala b/hail/src/main/scala/is/hail/HailFeatureFlags.scala index 3dcf48297978..bd8bda42d252 100644 --- a/hail/src/main/scala/is/hail/HailFeatureFlags.scala +++ b/hail/src/main/scala/is/hail/HailFeatureFlags.scala @@ -71,6 +71,9 @@ class HailFeatureFlags private ( def get(flag: String): String = flags(flag) + def lookup(flag: String): Option[String] = + Option(flags(flag)) + def exists(flag: String): Boolean = flags.contains(flag) def toJSONEnv: JArray = diff --git a/hail/src/main/scala/is/hail/backend/local/LocalBackend.scala b/hail/src/main/scala/is/hail/backend/local/LocalBackend.scala index cdb3105b0124..9564f7b817c2 100644 --- a/hail/src/main/scala/is/hail/backend/local/LocalBackend.scala +++ b/hail/src/main/scala/is/hail/backend/local/LocalBackend.scala @@ -40,8 +40,6 @@ object LocalBackend { def apply( tmpdir: String, - gcsRequesterPaysProject: String, - gcsRequesterPaysBuckets: String, logFile: String = "hail.log", quiet: Boolean = false, append: Boolean = false, @@ -51,11 +49,8 @@ object LocalBackend { if (!skipLoggingConfiguration) HailContext.configureLogging(logFile, quiet, append) - theLocalBackend = new LocalBackend( - tmpdir, - gcsRequesterPaysProject, - gcsRequesterPaysBuckets, - ) + + theLocalBackend = new LocalBackend(tmpdir) theLocalBackend.addDefaultReferences() theLocalBackend } @@ -72,32 +67,7 @@ object LocalBackend { } } -class LocalBackend( - val tmpdir: String, - gcsRequesterPaysProject: String, - gcsRequesterPaysBuckets: String, -) extends Backend with BackendWithCodeCache { - // FIXME don't rely on hadoop - val hadoopConf = new hadoop.conf.Configuration() - - if (gcsRequesterPaysProject != null) { - if (gcsRequesterPaysBuckets == null) { - hadoopConf.set("fs.gs.requester.pays.mode", "AUTO") - hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject) - } else { - hadoopConf.set("fs.gs.requester.pays.mode", "CUSTOM") - hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject) - hadoopConf.set("fs.gs.requester.pays.buckets", gcsRequesterPaysBuckets) - } - } - - hadoopConf.set( - "hadoop.io.compression.codecs", - "org.apache.hadoop.io.compress.DefaultCodec," - + "is.hail.io.compress.BGzipCodec," - + "is.hail.io.compress.BGzipCodecTbi," - + "org.apache.hadoop.io.compress.GzipCodec", - ) +class LocalBackend(val tmpdir: String) extends Backend with BackendWithCodeCache { private[this] val flags = HailFeatureFlags.fromEnv() private[this] val theHailClassLoader = new HailClassLoader(getClass().getClassLoader()) @@ -106,11 +76,15 @@ class LocalBackend( def setFlag(name: String, value: String) = flags.set(name, value) - val availableFlags: java.util.ArrayList[String] = flags.available + // called from python + val availableFlags: java.util.ArrayList[String] = + flags.available - val fs: FS = new HadoopFS(new SerializableHadoopConfiguration(hadoopConf)) + // flags can be set after construction from python + def fs: FS = FS.buildRoutes(None, Some(flags)) - def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T = + def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T = { + val fs = this.fs ExecuteContext.scoped( tmpdir, tmpdir, @@ -125,9 +99,11 @@ class LocalBackend( ExecutionCache.fromFlags(flags, fs, tmpdir) }, ) + } override def withExecuteContext[T](methodName: String)(f: ExecuteContext => T): T = ExecutionTimer.logTime(methodName) { timer => + val fs = this.fs ExecuteContext.scoped( tmpdir, tmpdir, diff --git a/hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala b/hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala index f2fc277e3a21..ccf457a0affd 100644 --- a/hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala +++ b/hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala @@ -64,7 +64,7 @@ object ServiceBackend { val flags = HailFeatureFlags.fromMap(rpcConfig.flags) val shouldProfile = flags.get("profile") != null - val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", Some(flags)) + val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), Some(flags)) val backendContext = new ServiceBackendContext( rpcConfig.billing_project, @@ -455,12 +455,12 @@ object ServiceBackendAPI { val inputURL = argv(5) val outputURL = argv(6) - val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None) + val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None) val deployConfig = DeployConfig.fromConfigFile( s"$scratchDir/secrets/deploy-config/deploy-config.json" ) DeployConfig.set(deployConfig) - sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_)) + sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir) val batchClient = new BatchClient(s"$scratchDir/secrets/gsa-key/key.json") log.info("BatchClient allocated.") diff --git a/hail/src/main/scala/is/hail/backend/service/Worker.scala b/hail/src/main/scala/is/hail/backend/service/Worker.scala index ad0b24989540..12438e6fe50c 100644 --- a/hail/src/main/scala/is/hail/backend/service/Worker.scala +++ b/hail/src/main/scala/is/hail/backend/service/Worker.scala @@ -117,7 +117,7 @@ object Worker { s"$scratchDir/secrets/deploy-config/deploy-config.json" ) DeployConfig.set(deployConfig) - sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_)) + sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir) log.info(s"is.hail.backend.service.Worker $myRevision") log.info(s"running job $i/$n at root $root with scratch directory '$scratchDir'") @@ -125,7 +125,7 @@ object Worker { timer.start(s"Job $i/$n") timer.start("readInputs") - val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None) + val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None) def open(x: String): SeekableDataInputStream = fs.openNoCompression(x) diff --git a/hail/src/main/scala/is/hail/io/fs/FS.scala b/hail/src/main/scala/is/hail/io/fs/FS.scala index aa9b00cccf4a..1db2478c605c 100644 --- a/hail/src/main/scala/is/hail/io/fs/FS.scala +++ b/hail/src/main/scala/is/hail/io/fs/FS.scala @@ -257,38 +257,40 @@ abstract class FSPositionedOutputStream(val capacity: Int) extends OutputStream } object FS { - def cloudSpecificFS( - credentialsPath: String, - flags: Option[HailFeatureFlags], - ): FS = retryTransientErrors { - val cloudSpecificFS = using(new FileInputStream(credentialsPath)) { is => - val credentialsStr = Some(IOUtils.toString(is, Charset.defaultCharset())) - sys.env.get("HAIL_CLOUD") match { - case Some("gcp") => - val requesterPaysConfiguration = flags.flatMap { flags => - RequesterPaysConfiguration.fromFlags( - flags.get("gcs_requester_pays_project"), - flags.get("gcs_requester_pays_buckets"), - ) - } - new GoogleStorageFS(credentialsStr, requesterPaysConfiguration) - case Some("azure") => - sys.env.get("HAIL_TERRA") match { - case Some(_) => new TerraAzureStorageFS() - case None => new AzureStorageFS(credentialsStr) - } + def buildRoutes(credentialsPath: Option[String], flags: Option[HailFeatureFlags]): FS = + retryTransientErrors { + val credentialsStr = credentialsPath.map { path => + using(new FileInputStream(path))(is => IOUtils.toString(is, Charset.defaultCharset())) + } + + def gcs = new GoogleStorageFS( + credentialsStr, + flags.flatMap(RequesterPaysConfiguration.fromFlags), + ) + + def az = sys.env.get("HAIL_TERRA") match { + case Some(_) => new TerraAzureStorageFS() + case None => new AzureStorageFS(credentialsStr) + } + + val cloudSpecificFSs = sys.env.get("HAIL_CLOUD") match { + case Some("gcp") => FastSeq(gcs) + case Some("azure") => FastSeq(az) case Some(cloud) => - throw new IllegalArgumentException(s"Bad cloud: $cloud") + throw new IllegalArgumentException(s"Unknown cloud provider: '$cloud'.'") case None => - throw new IllegalArgumentException(s"HAIL_CLOUD must be set.") + if (credentialsPath.isEmpty) FastSeq(gcs, az) + else fatal( + "Don't know to which cloud credentials belong because 'HAIL_CLOUD' was not set." + ) } - } - new RouterFS(Array( - cloudSpecificFS, - new HadoopFS(new SerializableHadoopConfiguration(new hadoop.conf.Configuration())), - )) - } + new RouterFS( + cloudSpecificFSs :+ new HadoopFS( + new SerializableHadoopConfiguration(new hadoop.conf.Configuration()) + ) + ) + } private val log = Logger.getLogger(getClass.getName()) } diff --git a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala index a7760f92e181..ca8cb09e4d8e 100644 --- a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala @@ -1,5 +1,6 @@ package is.hail.io.fs +import is.hail.HailFeatureFlags import is.hail.io.fs.FSUtil.dropTrailingSlash import is.hail.services.{isTransientError, retryTransientErrors} import is.hail.utils._ @@ -82,25 +83,17 @@ object GoogleStorageFileListEntry { } object RequesterPaysConfiguration { - def fromFlags(requesterPaysProject: String, requesterPaysBuckets: String) - : Option[RequesterPaysConfiguration] = { - if (requesterPaysProject == null) { - if (requesterPaysBuckets == null) { - None - } else { + def fromFlags(flags: HailFeatureFlags): Option[RequesterPaysConfiguration] = + FastSeq("gcs_requester_pays_project", "gcs_requester_pays_buckets").map(flags.lookup) match { + case Seq(Some(project), buckets) => + Some(RequesterPaysConfiguration(project, buckets.map(_.split(",").toSet))) + case Seq(None, Some(buckets)) => fatal( - s"Expected gcs_requester_pays_buckets flag to be unset when gcs_requester_pays_project is unset, but instead found: $requesterPaysBuckets" + s"Expected gcs_requester_pays_buckets flag to be unset when gcs_requester_pays_project is unset, but instead found: $buckets" ) - } - } else { - val buckets = if (requesterPaysBuckets == null) { + case _ => None - } else { - Some(requesterPaysBuckets.split(",").toSet) - } - Some(RequesterPaysConfiguration(requesterPaysProject, buckets)) } - } } case class RequesterPaysConfiguration(