Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snappy revamp #3564

Merged
merged 10 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import
std/[typetraits, tables],
stew/[arrayops, assign2, byteutils, endians2, io2, objects, results],
serialization, chronicles, snappy, snappy/framing,
serialization, chronicles, snappy,
eth/db/[kvstore, kvstore_sqlite3],
./networking/network_metadata, ./beacon_chain_db_immutable,
./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition],
Expand Down Expand Up @@ -526,7 +526,7 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool =

proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool =
try:
let decompressed = framingFormatUncompress(data)
let decompressed = decodeFramed(data)
readSszBytes(decompressed, output, updateRoot = false)
true
except CatchableError as e:
Expand All @@ -552,7 +552,7 @@ proc encodeSnappySSZ(v: auto): seq[byte] =
proc encodeSZSSZ(v: auto): seq[byte] =
# https://github.com/google/snappy/blob/main/framing_format.txt
try:
framingFormatCompress(SSZ.encode(v))
encodeFramed(SSZ.encode(v))
except CatchableError as err:
# In-memory encode shouldn't fail!
raiseAssert err.msg
Expand Down Expand Up @@ -852,7 +852,7 @@ proc getBlockSSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatUncompress(data)
try: dataPtr[] = decodeFramed(data)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success

Expand All @@ -873,7 +873,7 @@ proc getBlockSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
try: dataPtr[] = snappy.encodeFramed(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
Expand All @@ -885,7 +885,7 @@ proc getBlockSZ*(
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
try: dataPtr[] = snappy.encodeFramed(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
Expand Down
7 changes: 3 additions & 4 deletions beacon_chain/era_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import
std/os,
stew/results,
snappy/framing,
stew/results, snappy,
../ncli/e2store,
./spec/datatypes/[altair, bellatrix, phase0],
./spec/forks,
Expand Down Expand Up @@ -118,7 +117,7 @@ proc getBlockSSZ*(
? db.getBlockSZ(historical_roots, slot, tmp)

try:
bytes = framingFormatUncompress(tmp)
bytes = decodeFramed(tmp)
ok()
except CatchableError as exc:
err(exc.msg)
Expand Down Expand Up @@ -172,7 +171,7 @@ proc getStateSSZ*(
? db.getStateSZ(historical_roots, slot, tmp)

try:
bytes = framingFormatUncompress(tmp)
bytes = decodeFramed(tmp)
ok()
except CatchableError as exc:
err(exc.msg)
Expand Down
4 changes: 2 additions & 2 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
stew/[leb128, endians2, results, byteutils, io2, bitops2], bearssl,
stew/shims/net as stewNet,
stew/shims/[macros],
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams,
json_serialization, json_serialization/std/[net, sets, options],
chronos, chronicles, metrics,
libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto,
Expand Down Expand Up @@ -548,7 +548,7 @@ proc writeChunk*(conn: Connection,

output.write toBytes(payload.lenu64, Leb128).toOpenArray()

framingFormatCompress(output, payload)
compressFramed(payload, output)
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
conn.write(output.getOutput)
Expand Down
84 changes: 41 additions & 43 deletions beacon_chain/networking/libp2p_streams_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,83 +9,81 @@
proc uncompressFramedStream*(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]]
{.async.} =
var header: array[STREAM_HEADER.len, byte]
var header: array[framingHeader.len, byte]
try:
await conn.readExactly(addr header[0], header.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Unexpected EOF before snappy header"

if header != STREAM_HEADER.toOpenArrayByte(0, STREAM_HEADER.high):
if header != framingHeader:
return err "Incorrect snappy header"

static:
doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64

var
uncompressedData = newSeq[byte](MAX_UNCOMPRESSED_DATA_LEN)
frameData = newSeq[byte](MAX_COMPRESSED_DATA_LEN)
output = newSeqOfCap[byte](expectedSize)
frameData = newSeq[byte](maxCompressedFrameDataLen + 4)
output = newSeqUninitialized[byte](expectedSize)
written = 0

while output.len < expectedSize:
while written < expectedSize:
var frameHeader: array[4, byte]
try:
await conn.readExactly(addr frameHeader[0], frameHeader.len)
except LPStreamEOFError, LPStreamIncompleteError:
return err "no snappy frame"
return err "Snappy frame header missing"

let x = uint32.fromBytesLE frameHeader
let id = x and 0xFF
let dataLen = (x shr 8).int
let (id, dataLen) = decodeFrameHeader(frameHeader)

if dataLen > MAX_COMPRESSED_DATA_LEN:
return err "invalid snappy frame length"
if dataLen > frameData.len:
# In theory, compressed frames could be bigger and still result in a
# valid, small snappy frame, but this would mean they are not getting
# compressed correctly
return err "Snappy frame too big"

if dataLen > 0:
try:
await conn.readExactly(addr frameData[0], dataLen)
except LPStreamEOFError, LPStreamIncompleteError:
return err "Incomplete snappy frame"

if id == COMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
if id == chunkCompressed:
if dataLen < 6: # At least CRC + 2 bytes of frame data
return err "Compressed snappy frame too small"

let
crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
remaining = expectedSize - output.len
chunkLen = min(remaining, uncompressedData.len)

# Grab up to MAX_UNCOMPRESSED_DATA_LEN bytes, but no more than remains
# according to the expected size. If it turns out that the uncompressed
# data is longer than that, snappyUncompress will fail and we will not
# decompress the chunk at all, instead reporting failure.
let
# The `int` conversion below is safe, because `uncompressedLen` is
# bounded to `chunkLen` (which in turn is bounded by `MAX_CHUNK_SIZE`).
# TODO: Use a range type for the parameter.
uncompressedLen = int snappyUncompress(
frameData.toOpenArray(4, dataLen - 1),
uncompressedData.toOpenArray(0, chunkLen - 1))

if uncompressedLen == 0:
return err "Failed to decompress snappy frame"
doAssert output.len + uncompressedLen <= expectedSize,
"enforced by `remains` limit above"

if not checkCrc(uncompressedData.toOpenArray(0, uncompressedLen-1), crc):
uncompressed =
snappy.uncompress(
frameData.toOpenArray(4, dataLen - 1),
output.toOpenArray(written, output.high)).valueOr:
return err "Failed to decompress content"

if maskedCrc(
output.toOpenArray(written, written + uncompressed-1)) != crc:
return err "Snappy content CRC checksum failed"

output.add uncompressedData.toOpenArray(0, uncompressedLen-1)
written += uncompressed

elif id == chunkUncompressed:
if dataLen < 5: # At least one byte of data
return err "Uncompressed snappy frame too small"

let uncompressed = dataLen - 4

elif id == UNCOMPRESSED_DATA_IDENTIFIER:
if dataLen < 4:
return err "Snappy frame size too low to contain CRC checksum"
if uncompressed > maxUncompressedFrameDataLen.int:
return err "Snappy frame size too large"

if output.len + dataLen - 4 > expectedSize:
if uncompressed > output.len - written:
return err "Too much data"

let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3)
if not checkCrc(frameData.toOpenArray(4, dataLen - 1), crc):
if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc:
return err "Snappy content CRC checksum failed"

output.add frameData.toOpenArray(4, dataLen-1)
output[written..<written + uncompressed] =
frameData.toOpenArray(4, dataLen-1)
written += uncompressed

elif id < 0x80:
# Reserved unskippable chunks (chunk types 0x02-0x7f)
Expand Down
7 changes: 2 additions & 5 deletions ncli/e2store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import
std/strformat,
stew/[arrayops, endians2, io2, results],
snappy, snappy/framing,
snappy,
../beacon_chain/spec/[beacon_time, forks],
../beacon_chain/spec/eth2_ssz_serialization

Expand Down Expand Up @@ -89,10 +89,7 @@ proc appendRecord*(f: IoHandle, typ: Type, data: openArray[byte]): Result[int64,
ok(start)

proc toCompressedBytes(item: auto): seq[byte] =
try:
framingFormatCompress(SSZ.encode(item))
except CatchableError as exc:
raiseAssert exc.msg # shouldn't happen
snappy.encodeFramed(SSZ.encode(item))

proc appendRecord*(f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] =
f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v))
Expand Down
6 changes: 3 additions & 3 deletions ncli/ncli_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import
std/[os, stats, strformat, tables],
snappy, snappy/framing,
snappy,
chronicles, confutils, stew/[byteutils, io2], eth/db/kvstore_sqlite3,
../beacon_chain/networking/network_metadata,
../beacon_chain/[beacon_chain_db],
Expand Down Expand Up @@ -536,7 +536,7 @@ proc cmdImportEra(conf: DbConf, cfg: RuntimeConfig) =

if header.typ == SnappyBeaconBlock:
withTimer(timers[tBlock]):
let uncompressed = framingFormatUncompress(data)
let uncompressed = decodeFramed(data)
let blck = try: readSszForkedSignedBeaconBlock(cfg, uncompressed)
except CatchableError as exc:
error "Invalid snappy block", msg = exc.msg, file
Expand All @@ -547,7 +547,7 @@ proc cmdImportEra(conf: DbConf, cfg: RuntimeConfig) =
blocks += 1
elif header.typ == SnappyBeaconState:
withTimer(timers[tState]):
let uncompressed = framingFormatUncompress(data)
let uncompressed = decodeFramed(data)
let state = try: newClone(
readSszForkedHashedBeaconState(cfg, uncompressed))
except CatchableError as exc:
Expand Down
10 changes: 5 additions & 5 deletions tests/test_beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

import
std/[algorithm, options, sequtils],
unittest2,
unittest2, snappy,
../beacon_chain/[beacon_chain_db, interop],
../beacon_chain/spec/[beaconstate, forks, state_transition],
../beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
../beacon_chain/consensus_object_pools/blockchain_dag,
eth/db/kvstore, snappy/framing,
eth/db/kvstore,
# test utilies
./testutil, ./testdbutil, ./testblockutil, ./teststateutil

Expand Down Expand Up @@ -110,7 +110,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, phase0.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)

db.delBlock(root)
check:
Expand Down Expand Up @@ -152,7 +152,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, altair.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)

db.delBlock(root)
check:
Expand Down Expand Up @@ -194,7 +194,7 @@ suite "Beacon chain DB" & preset():
db.getBlockSSZ(root, tmp, bellatrix.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
tmp2 == encodeFramed(tmp)

db.delBlock(root)
check:
Expand Down