Skip to content

Commit

Permalink
Improve compression support:
Browse files Browse the repository at this point in the history
* Optimize parsing of compressed message headers
* Enforce protocol-defined message size maxima
* Update comments
  • Loading branch information
nbougalis committed Jun 26, 2020
1 parent 362a017 commit fe9922d
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 68 deletions.
4 changes: 3 additions & 1 deletion src/ripple/overlay/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ namespace compression {
std::size_t constexpr headerBytes = 6;
std::size_t constexpr headerBytesCompressed = 10;

enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x01 };
// All values other than 'none' must have the high bit. The low order four bits
// must be 0.
enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x90 };

enum class Compressed : std::uint8_t { On, Off };

Expand Down
4 changes: 2 additions & 2 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Message : public std::enable_shared_from_this<Message>
* @param in Pointer to the payload
* @param payloadBytes Size of the payload excluding the header size
* @param type Protocol message type
* @param comprAlgorithm Compression algorithm used in compression,
* @param compression Compression algorithm used in compression,
* currently LZ4 only. If None then the message is uncompressed.
* @param uncompressedBytes Size of the uncompressed message
*/
Expand All @@ -93,7 +93,7 @@ class Message : public std::enable_shared_from_this<Message>
std::uint8_t* in,
std::uint32_t payloadBytes,
int type,
Algorithm comprAlgorithm,
Algorithm compression,
std::uint32_t uncompressedBytes);

/** Try to compress the payload.
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class Peer
cycleStatus() = 0;
virtual bool
hasRange(std::uint32_t uMin, std::uint32_t uMax) = 0;

virtual bool
compressionEnabled() const = 0;
};

} // namespace ripple
Expand Down
50 changes: 37 additions & 13 deletions src/ripple/overlay/impl/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,46 @@ Message::compress()
}

/** Set payload header
* Uncompressed message header
* 47-42 Set to 0
* 41-16 Payload size
* 15-0 Message Type
* Compressed message header
* 79 Set to 0, indicates the message is compressed
* 78-76 Compression algorithm, value 1-7. Set to 1 to indicate LZ4
* compression 75-74 Set to 0 73-48 Payload size 47-32 Message Type
* 31-0 Uncompressed message size
*/
The header is a variable-sized structure that contains information about
the type of the message and the length and encoding of the payload.
The first bit determines whether a message is compressed or uncompressed;
for compressed messages, the next three bits identify the compression
algorithm.
All multi-byte values are represented in big endian.
For uncompressed messages (6 bytes), numbering bits from left to right:
- The first 6 bits are set to 0.
- The next 26 bits represent the payload size.
- The remaining 16 bits represent the message type.
For compressed messages (10 bytes), numbering bits from left to right:
- The first 32 bits, together, represent the compression algorithm
and payload size:
- The first bit is set to 1 to indicate the message is compressed.
- The next 3 bits indicate the compression algorithm.
- The next 2 bits are reserved at this time and set to 0.
- The remaining 26 bits represent the payload size.
- The next 16 bits represent the message type.
- The remaining 32 bits are the uncompressed message size.
The maximum size of a message at this time is 64 MB. Messages larger than
this will be dropped and the recipient may, at its option, sever the link.
@note While nominally a part of the wire protocol, the framing is subject
to change; future versions of the code may negotiate the use of
substantially different framing.
*/
void
Message::setHeader(
std::uint8_t* in,
std::uint32_t payloadBytes,
int type,
Algorithm comprAlgorithm,
Algorithm compression,
std::uint32_t uncompressedBytes)
{
auto h = in;
Expand All @@ -142,10 +166,10 @@ Message::setHeader(
*in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
*in++ = static_cast<std::uint8_t>(type & 0xFF);

if (comprAlgorithm != Algorithm::None)
if (compression != Algorithm::None)
{
pack(in, uncompressedBytes);
*h |= 0x80 | (static_cast<uint8_t>(comprAlgorithm) << 4);
*h |= static_cast<std::uint8_t>(compression);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ class PeerImp : public Peer,
boost::optional<hash_map<PublicKey, ShardInfo>>
getPeerShardInfo() const;

bool
compressionEnabled() const override
{
return compressionEnabled_ == Compressed::On;
}

private:
void
close();
Expand Down
99 changes: 77 additions & 22 deletions src/ripple/overlay/impl/ProtocolMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,51 +120,94 @@ buffersBegin(BufferSequence const& bufs)
bufs);
}

/** Parse a message header
* @return a seated optional if the message header was successfully
* parsed. An unseated optional otherwise, in which case
* @param ec contains more information:
* - set to `errc::success` if not enough bytes were present
* - set to `errc::no_message` if a valid header was not present
*/
template <class BufferSequence>
boost::optional<MessageHeader>
parseMessageHeader(BufferSequence const& bufs, std::size_t size)
parseMessageHeader(
boost::system::error_code& ec,
BufferSequence const& bufs,
std::size_t size)
{
using namespace ripple::compression;
auto iter = buffersBegin(bufs);

MessageHeader hdr;
auto const compressed = (*iter & 0x80) == 0x80;
auto iter = buffersBegin(bufs);

// Check valid header
if ((*iter & 0xFC) == 0 || compressed)
if (*iter & 0x80)
{
hdr.header_size = compressed ? headerBytesCompressed : headerBytes;
hdr.header_size = headerBytesCompressed;

// not enough bytes to parse the header
if (size < hdr.header_size)
return {};
{
ec = make_error_code(boost::system::errc::success);
return boost::none;
}

if (*iter & 0x0C)
{
ec = make_error_code(boost::system::errc::protocol_error);
return boost::none;
}

hdr.algorithm = static_cast<compression::Algorithm>(*iter);

if (compressed)
if (hdr.algorithm != compression::Algorithm::LZ4)
{
uint8_t algorithm = (*iter & 0x70) >> 4;
if (algorithm !=
static_cast<std::uint8_t>(compression::Algorithm::LZ4))
return {};
hdr.algorithm = compression::Algorithm::LZ4;
ec = make_error_code(boost::system::errc::protocol_error);
return boost::none;
}

for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
// clear the compression bits
hdr.payload_wire_size &= 0x03FFFFFF;

// clear the top four bits (the compression bits).
hdr.payload_wire_size &= 0x0FFFFFFF;

hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;

for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++;

if (compressed)
for (int i = 0; i != 4; ++i)
hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
for (int i = 0; i != 4; ++i)
hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;

return hdr;
}

if ((*iter & 0xFC) == 0)
{
hdr.header_size = headerBytes;

if (size < hdr.header_size)
{
ec = make_error_code(boost::system::errc::success);
return boost::none;
}

hdr.algorithm = Algorithm::None;

for (int i = 0; i != 4; ++i)
hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;

hdr.uncompressed_size = hdr.payload_wire_size;
hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size;

for (int i = 0; i != 2; ++i)
hdr.message_type = (hdr.message_type << 8) + *iter++;

return hdr;
}

return {};
ec = make_error_code(boost::system::errc::no_message);
return boost::none;
}

template <
Expand All @@ -186,7 +229,7 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
std::vector<std::uint8_t> payload;
payload.resize(header.uncompressed_size);

auto payloadSize = ripple::compression::decompress(
auto const payloadSize = ripple::compression::decompress(
stream,
header.payload_wire_size,
payload.data(),
Expand Down Expand Up @@ -226,23 +269,35 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler)
if (size == 0)
return result;

auto header = detail::parseMessageHeader(buffers, size);
auto header = detail::parseMessageHeader(result.second, buffers, size);

// If we can't parse the header then it may be that we don't have enough
// bytes yet, or because the message was cut off.
// bytes yet, or because the message was cut off (if error_code is success).
// Otherwise we failed to match the header's marker (error_code is set to
// no_message) or the compression algorithm is invalid (error_code is
// protocol_error) and signal an error.
if (!header)
return result;

// We implement a maximum size for protocol messages. Sending a message
// whose size exceeds this may result in the connection being dropped. A
// larger message size may be supported in the future or negotiated as
// part of a protocol upgrade.
if (header->payload_wire_size > megabytes(64))
if (header->payload_wire_size > megabytes(64) ||
header->uncompressed_size > megabytes(64))
{
result.second = make_error_code(boost::system::errc::message_size);
return result;
}

// We requested uncompressed messages from the peer but received compressed.
if (!handler.compressionEnabled() &&
header->algorithm != compression::Algorithm::None)
{
result.second = make_error_code(boost::system::errc::protocol_error);
return result;
}

// We don't have the whole message yet. This isn't an error but we have
// nothing to do.
if (header->total_wire_size > size)
Expand Down
40 changes: 10 additions & 30 deletions src/test/overlay/compression_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,14 @@ class compression_test : public beast::unit_test::suite
std::shared_ptr<T> proto,
protocol::MessageType mt,
uint16_t nbuffers,
const char* msg,
bool log = false)
std::string msg)
{
if (log)
printf("=== compress/decompress %s ===\n", msg);
testcase("Compress/Decompress: " + msg);

Message m(*proto, mt);

auto& buffer = m.getBuffer(Compressed::On);

if (log)
printf(
"==> compressed, original %d bytes, compressed %d bytes\n",
(int)m.getBuffer(Compressed::Off).size(),
(int)m.getBuffer(Compressed::On).size());

boost::beast::multi_buffer buffers;

// simulate multi-buffer
Expand All @@ -112,26 +105,15 @@ class compression_test : public beast::unit_test::suite
buffers.commit(boost::asio::buffer_copy(
buffers.prepare(slice.size()), boost::asio::buffer(slice)));
}
auto header =
ripple::detail::parseMessageHeader(buffers.data(), buffer.size());

if (log)
printf(
"==> parsed header: buffers size %d, compressed %d, algorithm "
"%d, header size %d, payload size %d, buffer size %d\n",
(int)buffers.size(),
header->algorithm != Algorithm::None,
(int)header->algorithm,
(int)header->header_size,
(int)header->payload_wire_size,
(int)buffer.size());

boost::system::error_code ec;
auto header = ripple::detail::parseMessageHeader(
ec, buffers.data(), buffer.size());

BEAST_EXPECT(header);

if (header->algorithm == Algorithm::None)
{
if (log)
printf("==> NOT COMPRESSED\n");
return;
}

std::vector<std::uint8_t> decompressed;
decompressed.resize(header->uncompressed_size);
Expand All @@ -157,8 +139,6 @@ class compression_test : public beast::unit_test::suite
uncompressed.begin() + ripple::compression::headerBytes,
uncompressed.end(),
decompressed.begin()));
if (log)
printf("\n");
}

std::shared_ptr<protocol::TMManifests>
Expand Down Expand Up @@ -460,4 +440,4 @@ class compression_test : public beast::unit_test::suite
BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20);

} // namespace test
} // namespace ripple
} // namespace ripple

0 comments on commit fe9922d

Please sign in to comment.