From fe9922d65428136508bd6e2f62ebe787029581b3 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Mon, 13 Apr 2020 10:26:14 -0400 Subject: [PATCH] Improve compression support: * Optimize parsing of compressed message headers * Enforce protocol-defined message size maxima * Update comments --- src/ripple/overlay/Compression.h | 4 +- src/ripple/overlay/Message.h | 4 +- src/ripple/overlay/Peer.h | 3 + src/ripple/overlay/impl/Message.cpp | 50 +++++++++--- src/ripple/overlay/impl/PeerImp.h | 6 ++ src/ripple/overlay/impl/ProtocolMessage.h | 99 ++++++++++++++++++----- src/test/overlay/compression_test.cpp | 40 +++------ 7 files changed, 138 insertions(+), 68 deletions(-) diff --git a/src/ripple/overlay/Compression.h b/src/ripple/overlay/Compression.h index 5d45dbda888..6bb94792b43 100644 --- a/src/ripple/overlay/Compression.h +++ b/src/ripple/overlay/Compression.h @@ -31,7 +31,9 @@ namespace compression { std::size_t constexpr headerBytes = 6; std::size_t constexpr headerBytesCompressed = 10; -enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x01 }; +// All values other than 'none' must have the high bit. The low order four bits +// must be 0. +enum class Algorithm : std::uint8_t { None = 0x00, LZ4 = 0x90 }; enum class Compressed : std::uint8_t { On, Off }; diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index af7a168ad0b..e2c081d123f 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -84,7 +84,7 @@ class Message : public std::enable_shared_from_this * @param in Pointer to the payload * @param payloadBytes Size of the payload excluding the header size * @param type Protocol message type - * @param comprAlgorithm Compression algorithm used in compression, + * @param compression Compression algorithm used in compression, * currently LZ4 only. If None then the message is uncompressed. * @param uncompressedBytes Size of the uncompressed message */ @@ -93,7 +93,7 @@ class Message : public std::enable_shared_from_this std::uint8_t* in, std::uint32_t payloadBytes, int type, - Algorithm comprAlgorithm, + Algorithm compression, std::uint32_t uncompressedBytes); /** Try to compress the payload. diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index d16433c1d0f..1f5aad376d2 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -118,6 +118,9 @@ class Peer cycleStatus() = 0; virtual bool hasRange(std::uint32_t uMin, std::uint32_t uMax) = 0; + + virtual bool + compressionEnabled() const = 0; }; } // namespace ripple diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index e2c1f2fb548..29440c44e48 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -109,22 +109,46 @@ Message::compress() } /** Set payload header - * Uncompressed message header - * 47-42 Set to 0 - * 41-16 Payload size - * 15-0 Message Type - * Compressed message header - * 79 Set to 0, indicates the message is compressed - * 78-76 Compression algorithm, value 1-7. Set to 1 to indicate LZ4 - * compression 75-74 Set to 0 73-48 Payload size 47-32 Message Type - * 31-0 Uncompressed message size - */ + + The header is a variable-sized structure that contains information about + the type of the message and the length and encoding of the payload. + + The first bit determines whether a message is compressed or uncompressed; + for compressed messages, the next three bits identify the compression + algorithm. + + All multi-byte values are represented in big endian. + + For uncompressed messages (6 bytes), numbering bits from left to right: + + - The first 6 bits are set to 0. + - The next 26 bits represent the payload size. + - The remaining 16 bits represent the message type. + + For compressed messages (10 bytes), numbering bits from left to right: + + - The first 32 bits, together, represent the compression algorithm + and payload size: + - The first bit is set to 1 to indicate the message is compressed. + - The next 3 bits indicate the compression algorithm. + - The next 2 bits are reserved at this time and set to 0. + - The remaining 26 bits represent the payload size. + - The next 16 bits represent the message type. + - The remaining 32 bits are the uncompressed message size. + + The maximum size of a message at this time is 64 MB. Messages larger than + this will be dropped and the recipient may, at its option, sever the link. + + @note While nominally a part of the wire protocol, the framing is subject + to change; future versions of the code may negotiate the use of + substantially different framing. +*/ void Message::setHeader( std::uint8_t* in, std::uint32_t payloadBytes, int type, - Algorithm comprAlgorithm, + Algorithm compression, std::uint32_t uncompressedBytes) { auto h = in; @@ -142,10 +166,10 @@ Message::setHeader( *in++ = static_cast((type >> 8) & 0xFF); *in++ = static_cast(type & 0xFF); - if (comprAlgorithm != Algorithm::None) + if (compression != Algorithm::None) { pack(in, uncompressedBytes); - *h |= 0x80 | (static_cast(comprAlgorithm) << 4); + *h |= static_cast(compression); } } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index e005fcf846b..4b279dea658 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -428,6 +428,12 @@ class PeerImp : public Peer, boost::optional> getPeerShardInfo() const; + bool + compressionEnabled() const override + { + return compressionEnabled_ == Compressed::On; + } + private: void close(); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 0f97cdd11b9..b929f91ba40 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -120,51 +120,94 @@ buffersBegin(BufferSequence const& bufs) bufs); } +/** Parse a message header + * @return a seated optional if the message header was successfully + * parsed. An unseated optional otherwise, in which case + * @param ec contains more information: + * - set to `errc::success` if not enough bytes were present + * - set to `errc::no_message` if a valid header was not present + */ template boost::optional -parseMessageHeader(BufferSequence const& bufs, std::size_t size) +parseMessageHeader( + boost::system::error_code& ec, + BufferSequence const& bufs, + std::size_t size) { using namespace ripple::compression; - auto iter = buffersBegin(bufs); MessageHeader hdr; - auto const compressed = (*iter & 0x80) == 0x80; + auto iter = buffersBegin(bufs); // Check valid header - if ((*iter & 0xFC) == 0 || compressed) + if (*iter & 0x80) { - hdr.header_size = compressed ? headerBytesCompressed : headerBytes; + hdr.header_size = headerBytesCompressed; + // not enough bytes to parse the header if (size < hdr.header_size) - return {}; + { + ec = make_error_code(boost::system::errc::success); + return boost::none; + } + + if (*iter & 0x0C) + { + ec = make_error_code(boost::system::errc::protocol_error); + return boost::none; + } + + hdr.algorithm = static_cast(*iter); - if (compressed) + if (hdr.algorithm != compression::Algorithm::LZ4) { - uint8_t algorithm = (*iter & 0x70) >> 4; - if (algorithm != - static_cast(compression::Algorithm::LZ4)) - return {}; - hdr.algorithm = compression::Algorithm::LZ4; + ec = make_error_code(boost::system::errc::protocol_error); + return boost::none; } for (int i = 0; i != 4; ++i) hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++; - // clear the compression bits - hdr.payload_wire_size &= 0x03FFFFFF; + + // clear the top four bits (the compression bits). + hdr.payload_wire_size &= 0x0FFFFFFF; hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size; for (int i = 0; i != 2; ++i) hdr.message_type = (hdr.message_type << 8) + *iter++; - if (compressed) - for (int i = 0; i != 4; ++i) - hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++; + for (int i = 0; i != 4; ++i) + hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++; + + return hdr; + } + + if ((*iter & 0xFC) == 0) + { + hdr.header_size = headerBytes; + + if (size < hdr.header_size) + { + ec = make_error_code(boost::system::errc::success); + return boost::none; + } + + hdr.algorithm = Algorithm::None; + + for (int i = 0; i != 4; ++i) + hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++; + + hdr.uncompressed_size = hdr.payload_wire_size; + hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size; + + for (int i = 0; i != 2; ++i) + hdr.message_type = (hdr.message_type << 8) + *iter++; return hdr; } - return {}; + ec = make_error_code(boost::system::errc::no_message); + return boost::none; } template < @@ -186,7 +229,7 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler) std::vector payload; payload.resize(header.uncompressed_size); - auto payloadSize = ripple::compression::decompress( + auto const payloadSize = ripple::compression::decompress( stream, header.payload_wire_size, payload.data(), @@ -226,10 +269,13 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler) if (size == 0) return result; - auto header = detail::parseMessageHeader(buffers, size); + auto header = detail::parseMessageHeader(result.second, buffers, size); // If we can't parse the header then it may be that we don't have enough - // bytes yet, or because the message was cut off. + // bytes yet, or because the message was cut off (if error_code is success). + // Otherwise we failed to match the header's marker (error_code is set to + // no_message) or the compression algorithm is invalid (error_code is + // protocol_error) and signal an error. if (!header) return result; @@ -237,12 +283,21 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler) // whose size exceeds this may result in the connection being dropped. A // larger message size may be supported in the future or negotiated as // part of a protocol upgrade. - if (header->payload_wire_size > megabytes(64)) + if (header->payload_wire_size > megabytes(64) || + header->uncompressed_size > megabytes(64)) { result.second = make_error_code(boost::system::errc::message_size); return result; } + // We requested uncompressed messages from the peer but received compressed. + if (!handler.compressionEnabled() && + header->algorithm != compression::Algorithm::None) + { + result.second = make_error_code(boost::system::errc::protocol_error); + return result; + } + // We don't have the whole message yet. This isn't an error but we have // nothing to do. if (header->total_wire_size > size) diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp index a73225eb3b0..454b10136f5 100644 --- a/src/test/overlay/compression_test.cpp +++ b/src/test/overlay/compression_test.cpp @@ -84,21 +84,14 @@ class compression_test : public beast::unit_test::suite std::shared_ptr proto, protocol::MessageType mt, uint16_t nbuffers, - const char* msg, - bool log = false) + std::string msg) { - if (log) - printf("=== compress/decompress %s ===\n", msg); + testcase("Compress/Decompress: " + msg); + Message m(*proto, mt); auto& buffer = m.getBuffer(Compressed::On); - if (log) - printf( - "==> compressed, original %d bytes, compressed %d bytes\n", - (int)m.getBuffer(Compressed::Off).size(), - (int)m.getBuffer(Compressed::On).size()); - boost::beast::multi_buffer buffers; // simulate multi-buffer @@ -112,26 +105,15 @@ class compression_test : public beast::unit_test::suite buffers.commit(boost::asio::buffer_copy( buffers.prepare(slice.size()), boost::asio::buffer(slice))); } - auto header = - ripple::detail::parseMessageHeader(buffers.data(), buffer.size()); - - if (log) - printf( - "==> parsed header: buffers size %d, compressed %d, algorithm " - "%d, header size %d, payload size %d, buffer size %d\n", - (int)buffers.size(), - header->algorithm != Algorithm::None, - (int)header->algorithm, - (int)header->header_size, - (int)header->payload_wire_size, - (int)buffer.size()); + + boost::system::error_code ec; + auto header = ripple::detail::parseMessageHeader( + ec, buffers.data(), buffer.size()); + + BEAST_EXPECT(header); if (header->algorithm == Algorithm::None) - { - if (log) - printf("==> NOT COMPRESSED\n"); return; - } std::vector decompressed; decompressed.resize(header->uncompressed_size); @@ -157,8 +139,6 @@ class compression_test : public beast::unit_test::suite uncompressed.begin() + ripple::compression::headerBytes, uncompressed.end(), decompressed.begin())); - if (log) - printf("\n"); } std::shared_ptr @@ -460,4 +440,4 @@ class compression_test : public beast::unit_test::suite BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20); } // namespace test -} // namespace ripple \ No newline at end of file +} // namespace ripple