diff --git a/hail/python/test/hail/fs/test_worker_driver_fs.py b/hail/python/test/hail/fs/test_worker_driver_fs.py index 76e8d312ad85..5c9d04faf058 100644 --- a/hail/python/test/hail/fs/test_worker_driver_fs.py +++ b/hail/python/test/hail/fs/test_worker_driver_fs.py @@ -25,6 +25,18 @@ def test_requester_pays_write_no_settings(): assert False +@skip_in_azure +def test_requester_pays_write_with_project(): + hl.stop() + hl.init(gcs_requester_pays_configuration='hail-vdc') + random_filename = 'gs://hail-services-requester-pays/test_requester_pays_on_worker_driver_' + secret_alnum_string(10) + try: + hl.utils.range_table(4, n_partitions=4).write(random_filename, overwrite=True) + finally: + hl.current_backend().fs.rmtree(random_filename) + assert False + + @skip_in_azure def test_requester_pays_with_project(): hl.stop() diff --git a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala index b9982c4e5242..d7db9b259f09 100644 --- a/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala +++ b/hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala @@ -261,13 +261,28 @@ class GoogleStorageFS( .build() val os: PositionedOutputStream = new FSPositionedOutputStream(8 * 1024 * 1024) { - private[this] val write: WriteChannel = storage.writer(blobInfo) + private[this] var writer: WriteChannel = null + + private[this] def writeHandlingRequesterPays(): Int = { + if (writer != null) { + writer.write(bb) + } else { + handleRequesterPays( + { (options: Seq[BlobSourceOption]) => + writer = storage.writer(blobInfo, options:_*) + writer.write(bb) + }, + BlobSourceOption.userProject _, + bucket + ) + } + } override def flush(): Unit = { bb.flip() while (bb.remaining() > 0) - write.write(bb) + writeHandlingRequesterPays() bb.clear() }