From 5a2758a47c5048c3758766c8e0b648431cdedca6 Mon Sep 17 00:00:00 2001 From: Leonhard Gruenschloss Date: Mon, 21 Nov 2022 09:26:43 +1100 Subject: [PATCH] [fs] Fix incorrect seeking semantics of GoogleStorageFS (#244) See https://github.com/hail-is/hail/pull/12487 --- .../scala/is/hail/io/fs/AzureStorageFS.scala | 6 ++-- hail/src/main/scala/is/hail/io/fs/FS.scala | 13 +++++--- .../scala/is/hail/io/fs/GoogleStorageFS.scala | 4 +-- hail/src/test/scala/is/hail/fs/FSSuite.scala | 33 ++++++++++++++++++- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala index 96d7db89873..a1cfc68ccd7 100644 --- a/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/AzureStorageFS.scala @@ -11,12 +11,12 @@ import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash} import org.apache.log4j.Logger import java.net.URI -import is.hail.utils.{fatal, defaultJSONFormats} +import is.hail.utils.{defaultJSONFormats, fatal} import org.json4s import org.json4s.jackson.JsonMethods import org.json4s.Formats -import java.io.{ByteArrayInputStream, FileNotFoundException, OutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileNotFoundException, OutputStream} import java.nio.file.FileSystems import java.time.Duration import scala.collection.mutable @@ -137,6 +137,8 @@ class AzureStorageFS(val credentialsJSON: Option[String] = None) extends FS { val is: SeekableInputStream = new FSSeekableInputStream { private[this] val client: BlobClient = blobClient + override def physicalSeek(newPos: Long): Unit = () + override def fill(): Int = { val pos = getPosition val numBytesRemainingInBlob = blobSize - pos diff --git a/hail/src/main/scala/is/hail/io/fs/FS.scala b/hail/src/main/scala/is/hail/io/fs/FS.scala index 88a912a0e3c..0c5844b7057 100644 --- a/hail/src/main/scala/is/hail/io/fs/FS.scala +++ b/hail/src/main/scala/is/hail/io/fs/FS.scala @@ -1,20 +1,20 @@ package is.hail.io.fs -import java.io._ -import java.nio.charset._ -import java.util.zip.GZIPOutputStream -import is.hail.{HailContext, HailFeatureFlags} import is.hail.backend.BroadcastValue import is.hail.io.compress.{BGzipInputStream, BGzipOutputStream} import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash} -import is.hail.utils._ import is.hail.services._ +import is.hail.utils._ +import is.hail.{HailContext, HailFeatureFlags} import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream import org.apache.commons.io.IOUtils import org.apache.hadoop +import java.io._ import java.nio.ByteBuffer +import java.nio.charset._ import java.nio.file.FileSystems +import java.util.zip.GZIPOutputStream import scala.collection.mutable import scala.io.Source @@ -153,6 +153,8 @@ abstract class FSSeekableInputStream extends InputStream with Seekable { toTransfer } + protected def physicalSeek(newPos: Long): Unit + def seek(newPos: Long): Unit = { val distance = newPos - pos val bufferSeekPosition = bb.position() + distance @@ -220,6 +222,7 @@ object FS { case cloud => throw new IllegalArgumentException(s"Bad cloud: $cloud") } + physicalSeek(newPos) } } } diff --git a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala index a25588d56f4..b133c8fddff 100644 --- a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala @@ -1,5 +1,6 @@ package is.hail.io.fs + import java.io.{ByteArrayInputStream, FileNotFoundException} import java.net.URI import java.nio.ByteBuffer @@ -208,8 +209,7 @@ class GoogleStorageFS( return n } - override def seek(newPos: Long): Unit = { - super.seek(newPos) + override def physicalSeek(newPos: Long): Unit = { seekHandlingRequesterPays(newPos) } } diff --git a/hail/src/test/scala/is/hail/fs/FSSuite.scala b/hail/src/test/scala/is/hail/fs/FSSuite.scala index 03604bac2c4..8563c58488d 100644 --- a/hail/src/test/scala/is/hail/fs/FSSuite.scala +++ b/hail/src/test/scala/is/hail/fs/FSSuite.scala @@ -4,8 +4,9 @@ import java.io.FileNotFoundException import is.hail.HailSuite import is.hail.backend.ExecuteContext import is.hail.io.fs.FSUtil.dropTrailingSlash -import is.hail.io.fs.{FS, FileStatus, Seekable} +import is.hail.io.fs.{FS, FileStatus, GoogleStorageFS, Seekable} import is.hail.utils._ +import org.apache.commons.codec.binary.Hex import org.apache.commons.io.IOUtils import org.scalatest.testng.TestNGSuite import org.testng.annotations.Test @@ -346,6 +347,36 @@ trait FSSuite extends TestNGSuite { fs.delete(f, false) assert(!fs.exists(f)) } + + @Test def testSeekAndReadStraddlingBufferSize(): Unit = { + val data = Array.tabulate(251)(_.toByte) + val f = t() + using(fs.create(f)) { os => + var i = 0 + // 66058 replicates are 8MB of data + while (i < 70000) { + os.write(data) + i += 1 + } + } + + using(fs.openNoCompression(f)) { is => + + is.seek(251) + assert(is.read() == 0) + assert(is.read() == 1) + + val seekPos = 8 * 1024 * 1024 - 512 + is.seek(8 * 1024 * 1024 - 512) + assert(is.getPosition == seekPos) + val toRead = new Array[Byte](512) + is.readFully(toRead) + + (0 until toRead.length).foreach { i => + assert(toRead(i) == ((seekPos + i) % 251).toByte) + } + } + } } class HadoopFSSuite extends HailSuite with FSSuite {