Skip to content

Commit

Permalink
[qob] maybe retry GSFS reads (hail-is#13013)
Browse files Browse the repository at this point in the history
Fixes hail-is#12983

---

After an `FSSeekableInputStream` method (successfully) returns,
`getPosition` always represents theintended location within the object.
We can entirely eliminate `lazyPosition` because it tracks the
same value as `getPosition` when `reader == null`.

For retryable reads, we just seek back to the known correct location of
the stream before attempting to read again.
  • Loading branch information
danking authored May 11, 2023
1 parent 2da08af commit 3b04c08
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
26 changes: 13 additions & 13 deletions hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,32 +198,30 @@ class GoogleStorageFS(

val is: SeekableInputStream = new FSSeekableInputStream {
private[this] var reader: ReadChannel = null
private[this] var lazyPosition: Long = 0L

private[this] def retryingRead(): Int = {
retryTransientErrors(
{ reader.read(bb) },
reset = Some({ () => reader.seek(getPosition) })
)
}

private[this] def readHandlingRequesterPays(bb: ByteBuffer): Int = {
if (reader != null) {
reader.read(bb)
retryingRead()
} else {
handleRequesterPays(
{ (options: Seq[BlobSourceOption]) =>
reader = retryTransientErrors { storage.reader(bucket, path, options:_*) }
reader.seek(lazyPosition)
reader.read(bb)
reader.seek(getPosition)
retryingRead()
},
BlobSourceOption.userProject _,
bucket
)
}
}

private[this] def seekHandlingRequesterPays(newPos: Long): Unit = {
if (reader != null) {
reader.seek(newPos)
} else {
lazyPosition = newPos
}
}

override def close(): Unit = {
if (!closed) {
if (reader != null) {
Expand Down Expand Up @@ -251,7 +249,9 @@ class GoogleStorageFS(
}

override def physicalSeek(newPos: Long): Unit = {
seekHandlingRequesterPays(newPos)
if (reader != null) {
reader.seek(newPos)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion hail/src/main/scala/is/hail/services/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ package object services {
}
}

def retryTransientErrors[T](f: => T): T = {
def retryTransientErrors[T](f: => T, reset: Option[() => Unit] = None): T = {
var delay = 0.1
var errors = 0
while (true) {
Expand All @@ -137,6 +137,7 @@ package object services {
}
}
delay = sleepAndBackoff(delay)
reset.foreach(_())
}

throw new AssertionError("unreachable")
Expand Down

0 comments on commit 3b04c08

Please sign in to comment.