Skip to content

Commit

Permalink
[fs] Fix incorrect seeking semantics of GoogleStorageFS (#244)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgruen authored Nov 20, 2022
1 parent 3c66763 commit 5a2758a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
6 changes: 4 additions & 2 deletions hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash}
import org.apache.log4j.Logger

import java.net.URI
import is.hail.utils.{fatal, defaultJSONFormats}
import is.hail.utils.{defaultJSONFormats, fatal}
import org.json4s
import org.json4s.jackson.JsonMethods
import org.json4s.Formats

import java.io.{ByteArrayInputStream, FileNotFoundException, OutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileNotFoundException, OutputStream}
import java.nio.file.FileSystems
import java.time.Duration
import scala.collection.mutable
Expand Down Expand Up @@ -137,6 +137,8 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS {
val is: SeekableInputStream = new FSSeekableInputStream {
private[this] val client: BlobClient = blobClient

override def physicalSeek(newPos: Long): Unit = ()

override def fill(): Int = {
val pos = getPosition
val numBytesRemainingInBlob = blobSize - pos
Expand Down
13 changes: 8 additions & 5 deletions hail/src/main/scala/is/hail/io/fs/FS.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package is.hail.io.fs

import java.io._
import java.nio.charset._
import java.util.zip.GZIPOutputStream
import is.hail.{HailContext, HailFeatureFlags}
import is.hail.backend.BroadcastValue
import is.hail.io.compress.{BGzipInputStream, BGzipOutputStream}
import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash}
import is.hail.utils._
import is.hail.services._
import is.hail.utils._
import is.hail.{HailContext, HailFeatureFlags}
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.io.IOUtils
import org.apache.hadoop

import java.io._
import java.nio.ByteBuffer
import java.nio.charset._
import java.nio.file.FileSystems
import java.util.zip.GZIPOutputStream
import scala.collection.mutable
import scala.io.Source

Expand Down Expand Up @@ -153,6 +153,8 @@ abstract class FSSeekableInputStream extends InputStream with Seekable {
toTransfer
}

protected def physicalSeek(newPos: Long): Unit

def seek(newPos: Long): Unit = {
val distance = newPos - pos
val bufferSeekPosition = bb.position() + distance
Expand Down Expand Up @@ -220,6 +222,7 @@ object FS {
case cloud =>
throw new IllegalArgumentException(s"Bad cloud: $cloud")
}
physicalSeek(newPos)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package is.hail.io.fs


import java.io.{ByteArrayInputStream, FileNotFoundException}
import java.net.URI
import java.nio.ByteBuffer
Expand Down Expand Up @@ -208,8 +209,7 @@ class GoogleStorageFS(
return n
}

override def seek(newPos: Long): Unit = {
super.seek(newPos)
override def physicalSeek(newPos: Long): Unit = {
seekHandlingRequesterPays(newPos)
}
}
Expand Down
33 changes: 32 additions & 1 deletion hail/src/test/scala/is/hail/fs/FSSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import java.io.FileNotFoundException
import is.hail.HailSuite
import is.hail.backend.ExecuteContext
import is.hail.io.fs.FSUtil.dropTrailingSlash
import is.hail.io.fs.{FS, FileStatus, Seekable}
import is.hail.io.fs.{FS, FileStatus, GoogleStorageFS, Seekable}
import is.hail.utils._
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.IOUtils
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test
Expand Down Expand Up @@ -346,6 +347,36 @@ trait FSSuite extends TestNGSuite {
fs.delete(f, false)
assert(!fs.exists(f))
}

@Test def testSeekAndReadStraddlingBufferSize(): Unit = {
val data = Array.tabulate(251)(_.toByte)
val f = t()
using(fs.create(f)) { os =>
var i = 0
// 66058 replicates are 8MB of data
while (i < 70000) {
os.write(data)
i += 1
}
}

using(fs.openNoCompression(f)) { is =>

is.seek(251)
assert(is.read() == 0)
assert(is.read() == 1)

val seekPos = 8 * 1024 * 1024 - 512
is.seek(8 * 1024 * 1024 - 512)
assert(is.getPosition == seekPos)
val toRead = new Array[Byte](512)
is.readFully(toRead)

(0 until toRead.length).foreach { i =>
assert(toRead(i) == ((seekPos + i) % 251).toByte)
}
}
}
}

class HadoopFSSuite extends HailSuite with FSSuite {
Expand Down

0 comments on commit 5a2758a

Please sign in to comment.