Skip to content

Commit

Permalink
Revert "Revert "[query] Zstdandard (Zstd) compression" (hail-is#13172)"
Browse files Browse the repository at this point in the history
This reverts commit 305b355.
  • Loading branch information
chrisvittal committed Jun 12, 2023
1 parent 305b355 commit 1989aae
Show file tree
Hide file tree
Showing 482 changed files with 375 additions and 80 deletions.
2 changes: 1 addition & 1 deletion hail/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ dependencies {

bundled 'com.kohlschutter.junixsocket:junixsocket-core:2.6.1'

bundled 'com.github.luben:zstd-jni:1.5.2-1'
bundled 'com.github.luben:zstd-jni:1.5.5-2'

bundled project(path: ':shadedazure', configuration: 'shadow')
}
Expand Down
16 changes: 0 additions & 16 deletions hail/python/hail/matrixtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2662,22 +2662,6 @@ def checkpoint(self, output: str, overwrite: bool = False, stage_locally: bool =
--------
>>> dataset = dataset.checkpoint('output/dataset_checkpoint.mt')
"""
if _codec_spec is None:
_codec_spec = """{
"name": "LEB128BufferSpec",
"child": {
"name": "BlockingBufferSpec",
"blockSize": 32768,
"child": {
"name": "LZ4FastBlockBufferSpec",
"blockSize": 32768,
"child": {
"name": "StreamBlockBufferSpec"
}
}
}
}"""

hl.current_backend().validate_file_scheme(output)

if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
Expand Down
16 changes: 0 additions & 16 deletions hail/python/hail/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,22 +1327,6 @@ def checkpoint(self, output: str, overwrite: bool = False, stage_locally: bool =
"""
hl.current_backend().validate_file_scheme(output)

if _codec_spec is None:
_codec_spec = """{
"name": "LEB128BufferSpec",
"child": {
"name": "BlockingBufferSpec",
"blockSize": 32768,
"child": {
"name": "LZ4FastBlockBufferSpec",
"blockSize": 32768,
"child": {
"name": "StreamBlockBufferSpec"
}
}
}
}"""

if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
self.write(output=output, overwrite=overwrite, stage_locally=stage_locally, _codec_spec=_codec_spec)
_assert_type = self._type
Expand Down
6 changes: 3 additions & 3 deletions hail/python/hail/vds/combiner/variant_dataset_combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class CombinerOutType(NamedTuple):
"name": "LEB128BufferSpec",
"child": {
"name": "BlockingBufferSpec",
"blockSize": 32768,
"blockSize": 65536,
"child": {
"name": "LZ4FastBlockBufferSpec",
"blockSize": 32768,
"name": "ZstdBlockBufferSpec",
"blockSize": 65536,
"child": {
"name": "StreamBlockBufferSpec"
}
Expand Down
10 changes: 3 additions & 7 deletions hail/python/test/hail/backend/test_service_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
@skip_unless_service_backend()
def test_tiny_driver_has_tiny_memory():
try:
hl.utils.range_table(100_000_000, 50).to_pandas()
except Exception as exc:
# Sometimes the JVM properly OOMs, sometimes it just dies.
assert (
'java.lang.OutOfMemoryError: Java heap space' in exc.args[0] or
'batch.worker.jvm_entryway_protocol.EndOfStream' in exc.args[0]
)
hl.eval(hl.range(1024 * 1024).map(lambda x: hl.range(1024 * 1024)))
except hl.utils.FatalError as exc:
assert "HailException: Hail off-heap memory exceeded maximum threshold: limit " in exc.args[0]
else:
assert False

Expand Down
4 changes: 2 additions & 2 deletions hail/python/test/hail/matrixtable/test_file_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def all_values_table_fixture(init_hail):

@pytest.mark.parametrize("path", mt_paths)
def test_backward_compatability_mt(path, all_values_matrix_table_fixture):
assert len(mt_paths) == 46, str((resource_dir, ht_paths))
assert len(mt_paths) == 56, str((resource_dir, ht_paths))

old = hl.read_matrix_table(path)

Expand All @@ -88,7 +88,7 @@ def test_backward_compatability_mt(path, all_values_matrix_table_fixture):

@pytest.mark.parametrize("path", ht_paths)
def test_backward_compatability_ht(path, all_values_table_fixture):
assert len(ht_paths) == 42, str((resource_dir, ht_paths))
assert len(ht_paths) == 52, str((resource_dir, ht_paths))

old = hl.read_table(path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,5 @@ class MatrixTableSpec(
}

object FileFormat {
val version: SemanticVersion = SemanticVersion(1, 6, 0)
val version: SemanticVersion = SemanticVersion(1, 7, 0)
}
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/Emit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ class Emit[C](
val ns = sigs.length
val deserializers = sc.states.states
.slice(start, start + ns)
.map(sc => sc.deserialize(BufferSpec.defaultUncompressed))
.map(sc => sc.deserialize(spec))

Array.range(start, start + ns).foreach(i => sc.newState(cb, i))

Expand Down Expand Up @@ -2374,7 +2374,7 @@ class Emit[C](
PCanonicalTuple(true, et.emitType.storageType).constructFromFields(cb, region, FastIndexedSeq(et), deepCopy = false)
}

val bufferSpec: BufferSpec = BufferSpec.defaultUncompressed
val bufferSpec: BufferSpec = BufferSpec.blockedUncompressed

val emitGlobals = EmitCode.fromI(mb)(cb => emitInNewBuilder(cb, globals))

Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/EmitClassBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class EmitClassBuilder[C](
private[this] def encodeLiterals(): Array[AnyRef] = {
val (literals, preEncodedLiterals) = emodb.literalsResult()
val litType = PCanonicalTuple(true, literals.map(_._1.canonicalPType.setRequired(true)): _*)
val spec = TypedCodecSpec(litType, BufferSpec.defaultUncompressed)
val spec = TypedCodecSpec(litType, BufferSpec.wireSpec)

cb.addInterface(typeInfo[FunctionWithLiterals].iname)
val mb2 = newEmitMethod("addAndDecodeLiterals", FastIndexedSeq[ParamType](typeInfo[Array[AnyRef]]), typeInfo[Unit])
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/IR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object EncodedLiteral {
case ts: PString => Str(ts.loadString(addr))
case _ =>
val etype = EType.defaultFromPType(pt)
val codec = TypedCodecSpec(etype, pt.virtualType, BufferSpec.defaultUncompressed)
val codec = TypedCodecSpec(etype, pt.virtualType, BufferSpec.wireSpec)
val bytes = codec.encodeArrays(ctx, pt, addr)
EncodedLiteral(codec, bytes)
}
Expand Down
2 changes: 1 addition & 1 deletion hail/src/main/scala/is/hail/expr/ir/Interpret.scala
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ object Interpret {
// TODO Is this right? where does wrapped run?
ctx.scopedExecution((hcl, fs, htc, r) => SafeRow(rt, f(hcl, fs, htc, r).apply(r, globalsOffset)))
} else {
val spec = BufferSpec.defaultUncompressed
val spec = BufferSpec.blockedUncompressed

val (_, initOp) = CompileWithAggregators[AsmFunction2RegionLongUnit](ctx,
extracted.states,
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/TableIR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2562,7 +2562,7 @@ case class TableMapRows(child: TableIR, newRow: IR) extends TableIR {
else
null

val spec = BufferSpec.defaultUncompressed
val spec = BufferSpec.blockedUncompressed

// Order of operations:
// 1. init op on all aggs and serialize to byte array.
Expand Down Expand Up @@ -3059,7 +3059,7 @@ case class TableKeyByAndAggregate(

val globalsBc = prev.globals.broadcast(ctx.theHailClassLoader)

val spec = BufferSpec.defaultUncompressed
val spec = BufferSpec.blockedUncompressed
val res = genUID()

val extracted = agg.Extract(expr, res, Requiredness(this, ctx))
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/agg/AggregatorState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait AggregatorState {
val lazyBuffer = kb.getOrDefineLazyField[MemoryBufferWrapper](Code.newInstance[MemoryBufferWrapper](), ("AggregatorStateBufferWrapper"))
cb += lazyBuffer.invoke[Array[Byte], Unit]("set", bytes.loadBytes(cb))
val ib = cb.memoize(lazyBuffer.invoke[InputBuffer]("buffer"))
deserialize(BufferSpec.defaultUncompressed)(cb, ib)
deserialize(BufferSpec.blockedUncompressed)(cb, ib)
cb += lazyBuffer.invoke[Unit]("invalidate")
}

Expand All @@ -48,7 +48,7 @@ trait AggregatorState {
val addr = kb.genFieldThisRef[Long]("addr")
cb += lazyBuffer.invoke[Unit]("clear")
val ob = cb.memoize(lazyBuffer.invoke[OutputBuffer]("buffer"))
serialize(BufferSpec.defaultUncompressed)(cb, ob)
serialize(BufferSpec.blockedUncompressed)(cb, ob)
cb.assign(addr, t.allocate(r, lazyBuffer.invoke[Int]("length")))
t.storeLength(cb, addr, lazyBuffer.invoke[Int]("length"))
cb += lazyBuffer.invoke[Long, Unit]("copyToAddress", t.bytesAddress(addr))
Expand Down
56 changes: 47 additions & 9 deletions hail/src/main/scala/is/hail/io/BufferSpecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,30 @@ import org.json4s.jackson.JsonMethods
import org.json4s.{Extraction, JValue}

object BufferSpec {
val default: BufferSpec = LEB128BufferSpec(
BlockingBufferSpec(32 * 1024,
LZ4HCBlockBufferSpec(32 * 1024,
val zstdCompressionLEB: BufferSpec = LEB128BufferSpec(
BlockingBufferSpec(64 * 1024,
ZstdBlockBufferSpec(64 * 1024,
new StreamBlockBufferSpec)))

val defaultUncompressed: BufferSpec = BlockingBufferSpec(32 * 1024,
new StreamBlockBufferSpec)
val default: BufferSpec = zstdCompressionLEB

val blockedUncompressed: BufferSpec = BlockingBufferSpec(32 * 1024,
new StreamBlockBufferSpec)
val unblockedUncompressed: BufferSpec = new StreamBufferSpec

val wireSpec: BufferSpec = LEB128BufferSpec(
BlockingBufferSpec(32 * 1024,
LZ4SizeBasedBlockBufferSpec("fast", 32 * 1024,
256,
BlockingBufferSpec(64 * 1024,
ZstdSizedBasedBlockBufferSpec(64 * 1024,
/*minCompressionSize=*/256,
new StreamBlockBufferSpec)))

val memorySpec: BufferSpec = wireSpec

// longtime default spec
val lz4HCCompressionLEB: BufferSpec = LEB128BufferSpec(
BlockingBufferSpec(32 * 1024,
LZ4HCBlockBufferSpec(32 * 1024,
new StreamBlockBufferSpec)))

val blockSpecs: Array[BufferSpec] = Array(
BlockingBufferSpec(64 * 1024,
new StreamBlockBufferSpec),
Expand All @@ -39,6 +45,9 @@ object BufferSpec {
BlockingBufferSpec(32 * 1024,
LZ4FastBlockBufferSpec(32 * 1024,
new StreamBlockBufferSpec)),
BlockingBufferSpec(64 * 1024,
ZstdBlockBufferSpec(64 * 1024,
new StreamBlockBufferSpec)),
new StreamBufferSpec)

val specs: Array[BufferSpec] = blockSpecs.flatMap { blockSpec =>
Expand All @@ -61,6 +70,7 @@ object BufferSpec {
classOf[LZ4HCBlockBufferSpec],
classOf[LZ4FastBlockBufferSpec],
classOf[LZ4SizeBasedBlockBufferSpec],
classOf[ZstdBlockBufferSpec],
classOf[StreamBlockBufferSpec],
classOf[BufferSpec],
classOf[LEB128BufferSpec],
Expand Down Expand Up @@ -174,6 +184,34 @@ final case class LZ4SizeBasedBlockBufferSpec(compressorType: String, blockSize:
Code.newInstance[LZ4SizeBasedCompressingOutputBlockBuffer, LZ4, Int, Int, OutputBlockBuffer](stagedlz4, blockSize, minCompressionSize, child.buildCodeOutputBuffer(out))
}

final case class ZstdBlockBufferSpec(blockSize: Int, child: BlockBufferSpec) extends BlockBufferSpec {
require(blockSize <= (1 << 16))

def buildInputBuffer(in: InputStream): InputBlockBuffer = new ZstdInputBlockBuffer(blockSize, child.buildInputBuffer(in))

def buildOutputBuffer(out: OutputStream): OutputBlockBuffer = new ZstdOutputBlockBuffer(blockSize, child.buildOutputBuffer(out))

def buildCodeInputBuffer(in: Code[InputStream]): Code[InputBlockBuffer] =
Code.newInstance[ZstdInputBlockBuffer, Int, InputBlockBuffer](blockSize, child.buildCodeInputBuffer(in))

def buildCodeOutputBuffer(out: Code[OutputStream]): Code[OutputBlockBuffer] =
Code.newInstance[ZstdOutputBlockBuffer, Int, OutputBlockBuffer](blockSize, child.buildCodeOutputBuffer(out))
}

final case class ZstdSizedBasedBlockBufferSpec(blockSize: Int, minCompressionSize: Int, child: BlockBufferSpec) extends BlockBufferSpec {
require(blockSize <= (1 << 16))

def buildInputBuffer(in: InputStream): InputBlockBuffer = new ZstdSizedBasedInputBlockBuffer(blockSize, child.buildInputBuffer(in))

def buildOutputBuffer(out: OutputStream): OutputBlockBuffer = new ZstdSizedBasedOutputBlockBuffer(blockSize, minCompressionSize, child.buildOutputBuffer(out))

def buildCodeInputBuffer(in: Code[InputStream]): Code[InputBlockBuffer] =
Code.newInstance[ZstdSizedBasedInputBlockBuffer, Int, InputBlockBuffer](blockSize, child.buildCodeInputBuffer(in))

def buildCodeOutputBuffer(out: Code[OutputStream]): Code[OutputBlockBuffer] =
Code.newInstance[ZstdSizedBasedOutputBlockBuffer, Int, Int, OutputBlockBuffer](blockSize, minCompressionSize, child.buildCodeOutputBuffer(out))
}

object StreamBlockBufferSpec {
def extract(jv: JValue): StreamBlockBufferSpec = new StreamBlockBufferSpec
}
Expand Down
56 changes: 56 additions & 0 deletions hail/src/main/scala/is/hail/io/InputBuffers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import is.hail.annotations.{Memory, Region}
import is.hail.io.compress.LZ4
import is.hail.utils._

import com.github.luben.zstd.Zstd

trait InputBuffer extends Closeable {
def close(): Unit

Expand Down Expand Up @@ -627,3 +629,57 @@ final class LZ4SizeBasedCompressingInputBlockBuffer(lz4: LZ4, blockSize: Int, in
result
}
}

final class ZstdInputBlockBuffer(blockSize: Int, in: InputBlockBuffer) extends InputBlockBuffer {
private val comp = new Array[Byte](4 + Zstd.compressBound(blockSize).toInt)

def close(): Unit = {
in.close()
}

def seek(offset: Long): Unit = in.seek(offset)

def readBlock(buf: Array[Byte]): Int = {
val blockLen = in.readBlock(comp)
if (blockLen == -1) {
blockLen
} else {
val compLen = blockLen - 4
val decompLen = Memory.loadInt(comp, 0)
val ret = Zstd.decompressByteArray(buf, 0, decompLen, comp, 4, compLen)
if (Zstd.isError(ret))
throw new com.github.luben.zstd.ZstdException(ret)
decompLen
}
}
}

final class ZstdSizedBasedInputBlockBuffer(blockSize: Int, in: InputBlockBuffer) extends InputBlockBuffer {
private val comp = new Array[Byte](4 + Zstd.compressBound(blockSize).toInt)

def close(): Unit = {
in.close()
}

def seek(offset: Long): Unit = in.seek(offset)

def readBlock(buf: Array[Byte]): Int = {
val blockLen = in.readBlock(comp)
if (blockLen == -1) {
blockLen
} else {
val compLen = blockLen - 4
val decomp = Memory.loadInt(comp, 0)
if (decomp % 2 == 0) {
System.arraycopy(comp, 4, buf, 0, compLen)
compLen
} else {
val decompLen = decomp >>> 1
val ret = Zstd.decompressByteArray(buf, 0, decompLen, comp, 4, compLen)
if (Zstd.isError(ret))
throw new com.github.luben.zstd.ZstdException(ret)
decompLen
}
}
}
}
Loading

0 comments on commit 1989aae

Please sign in to comment.