Skip to content

Commit

Permalink
[query] Use RouterFS With LocalBackend (#14407)
Browse files Browse the repository at this point in the history
A long-standing fixme in the LocalBackend was to not rely on HadoopFS,
which we use with the SparkBackend for compatibility with dataproc and
hdfs urls.

By default, the HadoopFS doesn't understand gs urls. Users need to
install the gcs-hadoop-connector (preinstalled in dataproc) to
communicate with google cloud storage. Spark handles supplying
credentials to the connector.

Issue #13904 is caused by failing to properly supply the
gcs-hadoop-connector with credentials in the LocalBackend. In the
absence of config, the connector hangs while trying to fetch a token
form a non-existant metadata server.

The LocalBackend was designed to be a testing ground for lowered and
compiled code that would eventually be run on batch, where we use the
RouterFS. I propose a pragmatic fix for #13904 that ditches the HadoopFS
for all but local filesystem access in the LocalBackend instead of
identifying and fixing the root cause.

In doing so, I made a couple of changes to how the RouterFS is
configured: In the absence of the `HAIL_CLOUD` environment variable,
RouterFS can handle gs and az urls iff credentials are not supplied. If
the user supplies creditials, we use `HAIL_CLOUD` to decide which cloud
to route to.

fixes #13904
  • Loading branch information
ehigham authored Apr 10, 2024
1 parent d81cb1a commit 091e661
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 161 deletions.
1 change: 0 additions & 1 deletion build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,6 @@ steps:
mkdir -p /io/tmp
# The test should use the test credentials, not CI's credentials
sed -i 's/gsa-key/test-gsa-key/g' ${SPARK_HOME}/conf/core-site.xml
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
export AZURE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
Expand Down
23 changes: 15 additions & 8 deletions hail/python/hail/backend/local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from hail.ir import finalize_randomness
from hail.ir.renderer import CSERenderer
from hailtop.aiocloud.aiogoogle import GCSRequesterPaysConfiguration
from hailtop.aiotools.validators import validate_file
from hailtop.fs.router_fs import RouterFS
from hailtop.utils import find_spark_home
Expand All @@ -28,12 +29,9 @@ def __init__(
skip_logging_configuration,
optimizer_iterations,
jvm_heap_size,
*,
gcs_requester_pays_project: Optional[str] = None,
gcs_requester_pays_buckets: Optional[str] = None,
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None,
):
self._exit_stack = ExitStack()
assert gcs_requester_pays_project is not None or gcs_requester_pays_buckets is None

spark_home = find_spark_home()
hail_jar_path = os.environ.get('HAIL_JAR')
Expand Down Expand Up @@ -67,8 +65,6 @@ def __init__(

jbackend = hail_package.backend.local.LocalBackend.apply(
tmpdir,
gcs_requester_pays_project,
gcs_requester_pays_buckets,
log,
True,
append,
Expand All @@ -77,11 +73,22 @@ def __init__(
jhc = hail_package.HailContext.apply(jbackend, branching_factor, optimizer_iterations)

super(LocalBackend, self).__init__(self._gateway.jvm, jbackend, jhc)
self._fs = self._exit_stack.enter_context(
RouterFS(gcs_kwargs={'gcs_requester_pays_configuration': gcs_requester_pays_configuration})
)

self._fs = self._exit_stack.enter_context(RouterFS())
self._logger = None

self._initialize_flags({})
flags = {}
if gcs_requester_pays_configuration is not None:
if isinstance(gcs_requester_pays_configuration, str):
flags['gcs_requester_pays_project'] = gcs_requester_pays_configuration
else:
assert isinstance(gcs_requester_pays_configuration, tuple)
flags['gcs_requester_pays_project'] = gcs_requester_pays_configuration[0]
flags['gcs_requester_pays_buckets'] = ','.join(gcs_requester_pays_configuration[1])

self._initialize_flags(flags)

def validate_file(self, uri: str) -> None:
validate_file(uri, self._fs.afs)
Expand Down
11 changes: 1 addition & 10 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,6 @@ def init_local(
optimizer_iterations = get_env_or_default(_optimizer_iterations, 'HAIL_OPTIMIZER_ITERATIONS', 3)

jvm_heap_size = get_env_or_default(jvm_heap_size, 'HAIL_LOCAL_BACKEND_HEAP_SIZE', None)
(
gcs_requester_pays_project,
gcs_requester_pays_buckets,
) = convert_gcs_requester_pays_configuration_to_hadoop_conf_style(
get_gcs_requester_pays_configuration(
gcs_requester_pays_configuration=gcs_requester_pays_configuration,
)
)
backend = LocalBackend(
tmpdir,
log,
Expand All @@ -632,8 +624,7 @@ def init_local(
skip_logging_configuration,
optimizer_iterations,
jvm_heap_size,
gcs_requester_pays_project=gcs_requester_pays_project,
gcs_requester_pays_buckets=gcs_requester_pays_buckets,
gcs_requester_pays_configuration,
)

if not backend.fs.exists(tmpdir):
Expand Down
2 changes: 0 additions & 2 deletions hail/python/test/hail/fs/test_worker_driver_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def test_requester_pays_write_no_settings():


@skip_in_azure
@fails_local_backend()
def test_requester_pays_write_with_project():
hl_stop_for_test()
hl_init_for_test(gcs_requester_pays_configuration='hail-vdc')
Expand Down Expand Up @@ -148,7 +147,6 @@ def test_requester_pays_with_project_more_than_one_partition():


@run_if_azure
@fails_local_backend
def test_can_access_public_blobs():
public_mt = 'https://azureopendatastorage.blob.core.windows.net/gnomad/release/3.1/mt/genomes/gnomad.genomes.v3.1.hgdp_1kg_subset.mt'
assert hl.hadoop_exists(public_mt)
Expand Down
8 changes: 6 additions & 2 deletions hail/src/main/scala/is/hail/HailFeatureFlags.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package is.hail

import is.hail.backend.ExecutionCache
import is.hail.io.fs.RequesterPaysConfig
import is.hail.utils._

import scala.collection.mutable
Expand Down Expand Up @@ -30,8 +31,8 @@ object HailFeatureFlags {
("shuffle_cutoff_to_local_sort", ("HAIL_SHUFFLE_CUTOFF" -> "512000000")), // This is in bytes
("grouped_aggregate_buffer_size", ("HAIL_GROUPED_AGGREGATE_BUFFER_SIZE" -> "50")),
("use_ssa_logs", "HAIL_USE_SSA_LOGS" -> "1"),
("gcs_requester_pays_project", "HAIL_GCS_REQUESTER_PAYS_PROJECT" -> null),
("gcs_requester_pays_buckets", "HAIL_GCS_REQUESTER_PAYS_BUCKETS" -> null),
(RequesterPaysConfig.Flags.RequesterPaysProject, "HAIL_GCS_REQUESTER_PAYS_PROJECT" -> null),
(RequesterPaysConfig.Flags.RequesterPaysBuckets, "HAIL_GCS_REQUESTER_PAYS_BUCKETS" -> null),
("index_branching_factor", "HAIL_INDEX_BRANCHING_FACTOR" -> null),
("rng_nonce", "HAIL_RNG_NONCE" -> "0x0"),
("profile", "HAIL_PROFILE" -> null),
Expand Down Expand Up @@ -71,6 +72,9 @@ class HailFeatureFlags private (

def get(flag: String): String = flags(flag)

def lookup(flag: String): Option[String] =
Option(flags(flag)).filter(_.nonEmpty)

def exists(flag: String): Boolean = flags.contains(flag)

def toJSONEnv: JArray =
Expand Down
48 changes: 12 additions & 36 deletions hail/src/main/scala/is/hail/backend/local/LocalBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ object LocalBackend {

def apply(
tmpdir: String,
gcsRequesterPaysProject: String,
gcsRequesterPaysBuckets: String,
logFile: String = "hail.log",
quiet: Boolean = false,
append: Boolean = false,
Expand All @@ -51,11 +49,8 @@ object LocalBackend {

if (!skipLoggingConfiguration)
HailContext.configureLogging(logFile, quiet, append)
theLocalBackend = new LocalBackend(
tmpdir,
gcsRequesterPaysProject,
gcsRequesterPaysBuckets,
)

theLocalBackend = new LocalBackend(tmpdir)
theLocalBackend.addDefaultReferences()
theLocalBackend
}
Expand All @@ -72,32 +67,7 @@ object LocalBackend {
}
}

class LocalBackend(
val tmpdir: String,
gcsRequesterPaysProject: String,
gcsRequesterPaysBuckets: String,
) extends Backend with BackendWithCodeCache {
// FIXME don't rely on hadoop
val hadoopConf = new hadoop.conf.Configuration()

if (gcsRequesterPaysProject != null) {
if (gcsRequesterPaysBuckets == null) {
hadoopConf.set("fs.gs.requester.pays.mode", "AUTO")
hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject)
} else {
hadoopConf.set("fs.gs.requester.pays.mode", "CUSTOM")
hadoopConf.set("fs.gs.requester.pays.project.id", gcsRequesterPaysProject)
hadoopConf.set("fs.gs.requester.pays.buckets", gcsRequesterPaysBuckets)
}
}

hadoopConf.set(
"hadoop.io.compression.codecs",
"org.apache.hadoop.io.compress.DefaultCodec,"
+ "is.hail.io.compress.BGzipCodec,"
+ "is.hail.io.compress.BGzipCodecTbi,"
+ "org.apache.hadoop.io.compress.GzipCodec",
)
class LocalBackend(val tmpdir: String) extends Backend with BackendWithCodeCache {

private[this] val flags = HailFeatureFlags.fromEnv()
private[this] val theHailClassLoader = new HailClassLoader(getClass().getClassLoader())
Expand All @@ -106,11 +76,15 @@ class LocalBackend(

def setFlag(name: String, value: String) = flags.set(name, value)

val availableFlags: java.util.ArrayList[String] = flags.available
// called from python
val availableFlags: java.util.ArrayList[String] =
flags.available

val fs: FS = new HadoopFS(new SerializableHadoopConfiguration(hadoopConf))
// flags can be set after construction from python
def fs: FS = FS.buildRoutes(None, Some(flags))

def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T =
def withExecuteContext[T](timer: ExecutionTimer): (ExecuteContext => T) => T = {
val fs = this.fs
ExecuteContext.scoped(
tmpdir,
tmpdir,
Expand All @@ -125,9 +99,11 @@ class LocalBackend(
ExecutionCache.fromFlags(flags, fs, tmpdir)
},
)
}

override def withExecuteContext[T](methodName: String)(f: ExecuteContext => T): T =
ExecutionTimer.logTime(methodName) { timer =>
val fs = this.fs
ExecuteContext.scoped(
tmpdir,
tmpdir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object ServiceBackend {

val flags = HailFeatureFlags.fromMap(rpcConfig.flags)
val shouldProfile = flags.get("profile") != null
val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", Some(flags))
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), Some(flags))

val backendContext = new ServiceBackendContext(
rpcConfig.billing_project,
Expand Down Expand Up @@ -454,12 +454,12 @@ object ServiceBackendAPI {
val inputURL = argv(5)
val outputURL = argv(6)

val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None)
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None)
val deployConfig = DeployConfig.fromConfigFile(
s"$scratchDir/secrets/deploy-config/deploy-config.json"
)
DeployConfig.set(deployConfig)
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_))
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)

val batchClient = new BatchClient(s"$scratchDir/secrets/gsa-key/key.json")
log.info("BatchClient allocated.")
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/backend/service/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ object Worker {
s"$scratchDir/secrets/deploy-config/deploy-config.json"
)
DeployConfig.set(deployConfig)
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir(_))
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)

log.info(s"is.hail.backend.service.Worker $myRevision")
log.info(s"running job $i/$n at root $root with scratch directory '$scratchDir'")

timer.start(s"Job $i/$n")

timer.start("readInputs")
val fs = FS.cloudSpecificFS(s"$scratchDir/secrets/gsa-key/key.json", None)
val fs = FS.buildRoutes(Some(s"$scratchDir/secrets/gsa-key/key.json"), None)

def open(x: String): SeekableDataInputStream =
fs.openNoCompression(x)
Expand Down
4 changes: 4 additions & 0 deletions hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class AzureStorageFSURL(
}

object AzureStorageFS {
object EnvVars {
val AzureApplicationCredentials = "AZURE_APPLICATION_CREDENTIALS"
}

private val AZURE_HTTPS_URI_REGEX =
"^https:\\/\\/([a-z0-9_\\-\\.]+)\\.blob\\.core\\.windows\\.net\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r

Expand Down
62 changes: 34 additions & 28 deletions hail/src/main/scala/is/hail/io/fs/FS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package is.hail.io.fs
import is.hail.{HailContext, HailFeatureFlags}
import is.hail.backend.BroadcastValue
import is.hail.io.compress.{BGzipInputStream, BGzipOutputStream}
import is.hail.io.fs.AzureStorageFS.EnvVars.AzureApplicationCredentials
import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash}
import is.hail.io.fs.GoogleStorageFS.EnvVars.GoogleApplicationCredentials
import is.hail.services._
import is.hail.utils._

Expand Down Expand Up @@ -257,38 +259,42 @@ abstract class FSPositionedOutputStream(val capacity: Int) extends OutputStream
}

object FS {
def cloudSpecificFS(
credentialsPath: String,
flags: Option[HailFeatureFlags],
): FS = retryTransientErrors {
val cloudSpecificFS = using(new FileInputStream(credentialsPath)) { is =>
val credentialsStr = Some(IOUtils.toString(is, Charset.defaultCharset()))
sys.env.get("HAIL_CLOUD") match {
case Some("gcp") =>
val requesterPaysConfiguration = flags.flatMap { flags =>
RequesterPaysConfiguration.fromFlags(
flags.get("gcs_requester_pays_project"),
flags.get("gcs_requester_pays_buckets"),
)
}
new GoogleStorageFS(credentialsStr, requesterPaysConfiguration)
case Some("azure") =>
sys.env.get("HAIL_TERRA") match {
case Some(_) => new TerraAzureStorageFS()
case None => new AzureStorageFS(credentialsStr)
}
def buildRoutes(credentialsPath: Option[String], flags: Option[HailFeatureFlags]): FS =
retryTransientErrors {

def readString(path: String): String =
using(new FileInputStream(path))(is => IOUtils.toString(is, Charset.defaultCharset()))

def gcs = new GoogleStorageFS(
credentialsPath.orElse(sys.env.get(GoogleApplicationCredentials)).map(readString),
flags.flatMap(RequesterPaysConfig.fromFlags),
)

def az = sys.env.get("HAIL_TERRA") match {
case Some(_) => new TerraAzureStorageFS()
case None => new AzureStorageFS(
credentialsPath.orElse(sys.env.get(AzureApplicationCredentials)).map(readString)
)
}

val cloudSpecificFSs = sys.env.get("HAIL_CLOUD") match {
case Some("gcp") => FastSeq(gcs)
case Some("azure") => FastSeq(az)
case Some(cloud) =>
throw new IllegalArgumentException(s"Bad cloud: $cloud")
throw new IllegalArgumentException(s"Unknown cloud provider: '$cloud'.'")
case None =>
throw new IllegalArgumentException(s"HAIL_CLOUD must be set.")
if (credentialsPath.isEmpty) FastSeq(gcs, az)
else fatal(
"Don't know to which cloud credentials belong because 'HAIL_CLOUD' was not set."
)
}
}

new RouterFS(Array(
cloudSpecificFS,
new HadoopFS(new SerializableHadoopConfiguration(new hadoop.conf.Configuration())),
))
}
new RouterFS(
cloudSpecificFSs :+ new HadoopFS(
new SerializableHadoopConfiguration(new hadoop.conf.Configuration())
)
)
}

private val log = Logger.getLogger(getClass.getName())
}
Expand Down
Loading

0 comments on commit 091e661

Please sign in to comment.