Skip to content

Commit

Permalink
[query] Use RouterFS With LocalBackend
Browse files Browse the repository at this point in the history
A long-standing fixme in the LocalBackend was to not rely on HadoopFS,
which we use with the SparkBackend for compatibility with dataproc and
hdfs urls.

By default, the HadoopFS doesn't understand gs urls. Users need to
install the gcs-hadoop-connector (preinstalled in dataproc) to
communicate with google cloud storage. Spark handles supplying
credentials to the connector.

Issue hail-is#13904 is caused by failing to properly supply the
gcs-hadoop-connector with credentials in the LocalBackend. In the
absence of config, the connector hangs while trying to fetch a token
form a non-existant metadata server.

The LocalBackend was designed to be a testing ground for lowered and
compiled code that would eventually be run on batch, where we use the
RouterFS. I propose a pragmatic fix for hail-is#13904 that ditches the HadoopFS
for all but local filesystem access in the LocalBackend instead of
identifying and fixing the root cause.

In doing so, I made a couple of changes to how the RouterFS is
configured: In the absence of the `HAIL_CLOUD` environment variable,
RouterFS can handle gs and az urls iff credentials are not supplied. If
the user supplies creditials, we use `HAIL_CLOUD` to decide which cloud
to route to.

fixes hail-is#13904
  • Loading branch information
ehigham committed Mar 14, 2024
1 parent 01a6a6a commit 432fedb
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 87 deletions.
7 changes: 4 additions & 3 deletions hail/python/hail/backend/local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ def __init__(

jbackend = hail_package.backend.local.LocalBackend.apply(
tmpdir,
gcs_requester_pays_project,
gcs_requester_pays_buckets,
log,
True,
append,
Expand All @@ -81,7 +79,10 @@ def __init__(
self._fs = self._exit_stack.enter_context(RouterFS())
self._logger = None

self._initialize_flags({})
self._initialize_flags({
'gcs_requester_pays_project': gcs_requester_pays_project,
'gcs_requester_pays_buckets': gcs_requester_pays_buckets
})

def validate_file(self, uri: str) -> None:
validate_file(uri, self._fs.afs)
Expand Down
3 changes: 3 additions & 0 deletions hail/src/main/scala/is/hail/HailFeatureFlags.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class HailFeatureFlags private (

def get(flag: String): String = flags(flag)

def lookup(flag: String): Option[String] =
Option(flags(flag))

def exists(flag: String): Boolean = flags.contains(flag)

def toJSONEnv: JArray =
Expand Down
48 changes: 12 additions & 36 deletions hail/src/main/scala/is/hail/backend/local/LocalBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ object LocalBackend {

def apply(
tmpdir: String,
gcsRequesterPaysProject: String,
gcsRequesterPaysBuckets: String,
logFile: String = "hail.log",
quiet: Boolean = false,
append: Boolean = false,
Expand All @@ -51,11 +49,8 @@ object LocalBackend {

if (!skipLoggingConfiguration)
HailContext.configureLogging(logFile, quiet, append)
theLocalBackend = new LocalBackend(
tmpdir,
gcsRequesterPaysProject,
gcsRequesterPaysBuckets,
)

theLocalBackend = new LocalBackend(tmpdir)
theLocalBackend.addDefaultReferences()
theLocalBackend
}
Expand All @@ -72,32 +67,7 @@ object LocalBackend {
}
}

class LocalBackend(
val tmpdir: String,
gcsRequesterPaysProject: String,
gcsRequesterPaysBuckets: String,
) extends Backend with BackendWithCodeCache {
// FIXME don't rely on hadoop
val hadoopConf = new hadoop.conf.Configuration()

if (gcsRequesterPaysProject != null) {
if (gcsRequesterPaysBuckets == null) {
hadoopConf.set("fs.gs.requester.pays.mode", "AUTO")
hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject)
} else {
hadoopConf.set("fs.gs.requester.pays.mode", "CUSTOM")
hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject)
hadoopConf.set("fs.gs.requester.pays.buckets", gcsRequesterPaysBuckets)
}
}

hadoopConf.set(
"hadoop.io.compression.codecs",
"org.apache.hadoop.io.compress.DefaultCodec,"
+ "is.hail.io.compress.BGzipCodec,"
+ "is.hail.io.compress.BGzipCodecTbi,"
+ "org.apache.hadoop.io.compress.GzipCodec",
)
class LocalBackend(val tmpdir: String) extends Backend with BackendWithCodeCache {

private[this] val flags = HailFeatureFlags.fromEnv()
private[this] val theHailClassLoader = new HailClassLoader(getClass().getClassLoader())
Expand All @@ -106,11 +76,15 @@ class LocalBackend(

def setFlag(name: String, value: String) = flags.set(name, value)

val availableFlags: java.util.ArrayList[String] = flags.available
// called from python
val availableFlags: java.util.ArrayList[String] =
flags.available

val fs: FS = new HadoopFS(new SerializableHadoopConfiguration(hadoopConf))
// flags can be set after construction from python
def fs: FS = FS.buildRoutes(None, Some(flags))

def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T =
def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T = {
val fs = this.fs
ExecuteContext.scoped(
tmpdir,
tmpdir,
Expand All @@ -125,9 +99,11 @@ class LocalBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
)
}

override def withExecuteContext[T](methodName: String)(f: ExecuteContext => T): T =
ExecutionTimer.logTime(methodName) { timer =>
val fs = this.fs
ExecuteContext.scoped(
tmpdir,
tmpdir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object ServiceBackend {

val flags = HailFeatureFlags.fromMap(rpcConfig.flags)
val shouldProfile = flags.get("profile") != null
val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", Some(flags))
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), Some(flags))

val backendContext = new ServiceBackendContext(
rpcConfig.billing_project,
Expand Down Expand Up @@ -455,12 +455,12 @@ object ServiceBackendAPI {
val inputURL = argv(5)
val outputURL = argv(6)

val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None)
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None)
val deployConfig = DeployConfig.fromConfigFile(
s"$scratchDir/secrets/deploy-config/deploy-config.json"
)
DeployConfig.set(deployConfig)
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_))
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)

val batchClient = new BatchClient(s"$scratchDir/secrets/gsa-key/key.json")
log.info("BatchClient allocated.")
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/backend/service/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ object Worker {
s"$scratchDir/secrets/deploy-config/deploy-config.json"
)
DeployConfig.set(deployConfig)
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_))
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)

log.info(s"is.hail.backend.service.Worker $myRevision")
log.info(s"running job $i/$n at root $root with scratch directory '$scratchDir'")

timer.start(s"Job $i/$n")

timer.start("readInputs")
val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None)
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None)

def open(x: String): SeekableDataInputStream =
fs.openNoCompression(x)
Expand Down
58 changes: 30 additions & 28 deletions hail/src/main/scala/is/hail/io/fs/FS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,38 +257,40 @@ abstract class FSPositionedOutputStream(val capacity: Int) extends OutputStream
}

object FS {
def cloudSpecificFS(
credentialsPath: String,
flags: Option[HailFeatureFlags],
): FS = retryTransientErrors {
val cloudSpecificFS = using(new FileInputStream(credentialsPath)) { is =>
val credentialsStr = Some(IOUtils.toString(is, Charset.defaultCharset()))
sys.env.get("HAIL_CLOUD") match {
case Some("gcp") =>
val requesterPaysConfiguration = flags.flatMap { flags =>
RequesterPaysConfiguration.fromFlags(
flags.get("gcs_requester_pays_project"),
flags.get("gcs_requester_pays_buckets"),
)
}
new GoogleStorageFS(credentialsStr, requesterPaysConfiguration)
case Some("azure") =>
sys.env.get("HAIL_TERRA") match {
case Some(_) => new TerraAzureStorageFS()
case None => new AzureStorageFS(credentialsStr)
}
def buildRoutes(credentialsPath: Option[String], flags: Option[HailFeatureFlags]): FS =
retryTransientErrors {
val credentialsStr = credentialsPath.map { path =>
using(new FileInputStream(path))(is => IOUtils.toString(is, Charset.defaultCharset()))
}

def gcs = new GoogleStorageFS(
credentialsStr,
flags.flatMap(RequesterPaysConfiguration.fromFlags),
)

def az = sys.env.get("HAIL_TERRA") match {
case Some(_) => new TerraAzureStorageFS()
case None => new AzureStorageFS(credentialsStr)
}

val cloudSpecificFSs = sys.env.get("HAIL_CLOUD") match {
case Some("gcp") => FastSeq(gcs)
case Some("azure") => FastSeq(az)
case Some(cloud) =>
throw new IllegalArgumentException(s"Bad cloud: $cloud")
throw new IllegalArgumentException(s"Unknown cloud provider: '$cloud'.'")
case None =>
throw new IllegalArgumentException(s"HAIL_CLOUD must be set.")
if (credentialsPath.isEmpty) FastSeq(gcs, az)
else fatal(
"Don't know to which cloud credentials belong because 'HAIL_CLOUD' was not set."
)
}
}

new RouterFS(Array(
cloudSpecificFS,
new HadoopFS(new SerializableHadoopConfiguration(new hadoop.conf.Configuration())),
))
}
new RouterFS(
cloudSpecificFSs :+ new HadoopFS(
new SerializableHadoopConfiguration(new hadoop.conf.Configuration())
)
)
}

private val log = Logger.getLogger(getClass.getName())
}
Expand Down
23 changes: 8 additions & 15 deletions hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package is.hail.io.fs

import is.hail.HailFeatureFlags
import is.hail.io.fs.FSUtil.dropTrailingSlash
import is.hail.services.{isTransientError, retryTransientErrors}
import is.hail.utils._
Expand Down Expand Up @@ -82,25 +83,17 @@ object GoogleStorageFileListEntry {
}

object RequesterPaysConfiguration {
def fromFlags(requesterPaysProject: String, requesterPaysBuckets: String)
: Option[RequesterPaysConfiguration] = {
if (requesterPaysProject == null) {
if (requesterPaysBuckets == null) {
None
} else {
def fromFlags(flags: HailFeatureFlags): Option[RequesterPaysConfiguration] =
FastSeq("gcs_requester_pays_project", "gcs_requester_pays_buckets").map(flags.lookup) match {
case Seq(Some(project), buckets) =>
Some(RequesterPaysConfiguration(project, buckets.map(_.split(",").toSet)))
case Seq(None, Some(buckets)) =>
fatal(
s"Expected gcs_requester_pays_buckets flag to be unset when gcs_requester_pays_project is unset, but instead found: $requesterPaysBuckets"
s"Expected gcs_requester_pays_buckets flag to be unset when gcs_requester_pays_project is unset, but instead found: $buckets"
)
}
} else {
val buckets = if (requesterPaysBuckets == null) {
case _ =>
None
} else {
Some(requesterPaysBuckets.split(",").toSet)
}
Some(RequesterPaysConfiguration(requesterPaysProject, buckets))
}
}
}

case class RequesterPaysConfiguration(
Expand Down

0 comments on commit 432fedb

Please sign in to comment.