Skip to content

Commit

Permalink
[query] Add RouterFS to scala to permit local file system IO in QoB (#…
Browse files Browse the repository at this point in the history
…12667)

* [query] Add RouterFS to scala to permit local file system IO in QoB

* erasure

* scala is stupid

* override open/create cached

* fix cast
  • Loading branch information
tpoterba authored Feb 15, 2023
1 parent 2fbf68b commit 0f85414
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,13 @@ class ServiceBackend(

def parallelizeAndComputeWithIndex(
_backendContext: BackendContext,
_fs: FS,
fs: FS,
collection: Array[Array[Byte]],
stageIdentifier: String,
dependency: Option[TableStageDependency] = None
)(f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
): Array[Array[Byte]] = {
val backendContext = _backendContext.asInstanceOf[ServiceBackendContext]
val fs = _fs.asInstanceOf[ServiceCacheableFS]
val n = collection.length
val token = tokenUrlSafe(32)
val root = s"${ backendContext.remoteTmpDir }parallelizeAndComputeWithIndex/$token"
Expand Down
17 changes: 12 additions & 5 deletions hail/src/main/scala/is/hail/io/fs/FS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ object FS {
def cloudSpecificCacheableFS(
credentialsPath: String,
flags: Option[HailFeatureFlags]
): ServiceCacheableFS = retryTransientErrors {
using(new FileInputStream(credentialsPath)) { is =>
): FS = retryTransientErrors {
val (scheme, cloudSpecificFS) = using(new FileInputStream(credentialsPath)) { is =>
val credentialsStr = Some(IOUtils.toString(is, Charset.defaultCharset()))
sys.env.get("HAIL_CLOUD") match {
case Some("gcp") =>
Expand All @@ -220,19 +220,26 @@ object FS {
flags.get("gcs_requester_pays_project"), flags.get("gcs_requester_pays_buckets")
)
}
new GoogleStorageFS(credentialsStr, requesterPaysConfiguration).asCacheable()
("gs", new GoogleStorageFS(credentialsStr, requesterPaysConfiguration).asCacheable())
case Some("azure") =>
new AzureStorageFS(credentialsStr).asCacheable()
case Some(cloud) =>
("hail-az", new AzureStorageFS(credentialsStr).asCacheable())
case cloud =>
throw new IllegalArgumentException(s"Bad cloud: $cloud")
case None =>
throw new IllegalArgumentException(s"HAIL_CLOUD must be set.")
}
}

new RouterFS(Map(scheme -> cloudSpecificFS, "file" -> new HadoopFS(new SerializableHadoopConfiguration(new hadoop.conf.Configuration()))), "file")
}
}

trait FS extends Serializable {

def openCachedNoCompression(filename: String): SeekableDataInputStream = openNoCompression(filename)

def createCachedNoCompression(filename: String): PositionedDataOutputStream = createNoCompression(filename)

def getCodecFromExtension(extension: String, gzAsBGZ: Boolean = false): CompressionCodec = {
extension match {
case ".gz" =>
Expand Down
35 changes: 35 additions & 0 deletions hail/src/main/scala/is/hail/io/fs/RouterFS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package is.hail.io.fs

class RouterFS(schemes: Map[String, FS], default: String) extends FS {

def lookupFS(path: String): FS = {
val uri = new java.net.URI(path)
schemes.getOrElse(uri.getScheme, schemes(default))
}

override def openCachedNoCompression(filename: String): SeekableDataInputStream = lookupFS(filename).openCachedNoCompression(filename)

override def createCachedNoCompression(filename: String): PositionedDataOutputStream = lookupFS(filename).createCachedNoCompression(filename)

def openNoCompression(filename: String, _debug: Boolean = false): SeekableDataInputStream = lookupFS(filename).openNoCompression(filename, _debug)

def createNoCompression(filename: String): PositionedDataOutputStream = lookupFS(filename).createNoCompression(filename)

override def mkDir(dirname: String): Unit = lookupFS(dirname).mkDir(dirname)

def delete(filename: String, recursive: Boolean) = lookupFS(filename).delete(filename, recursive)

def listStatus(filename: String): Array[FileStatus] = lookupFS(filename).listStatus(filename)

def glob(filename: String): Array[FileStatus] = lookupFS(filename).glob(filename)

def fileStatus(filename: String): FileStatus = lookupFS(filename).fileStatus(filename)

def makeQualified(path: String): String = lookupFS(path).makeQualified(path)

def getConfiguration(): Any = schemes.map { case (k, v) => (k, v.getConfiguration()) }

def setConfiguration(config: Any): Unit = {
config.asInstanceOf[Map[_, _]].foreach { case (k: String, v: Any) => schemes(k).setConfiguration(v) }
}
}
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/io/fs/ServiceCacheableFS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ trait ServiceCacheableFS extends FS {
else MemoryClient.get
}

def openCachedNoCompression(filename: String): SeekableDataInputStream = {
override def openCachedNoCompression(filename: String): SeekableDataInputStream = {
client.open(filename).map(new WrappedSeekableDataInputStream(_)).getOrElse(openNoCompression(filename))
}

def createCachedNoCompression(filename: String): PositionedDataOutputStream = {
override def createCachedNoCompression(filename: String): PositionedDataOutputStream = {
val os: PositionedOutputStream = new OutputStream with Positioned {
private[this] var closed: Boolean = false
private[this] val bb: ArrayBuffer[Byte] = new ArrayBuffer()
Expand Down

0 comments on commit 0f85414

Please sign in to comment.