From 4b7991bdb62e1db4bb3826bf044dda4b452b4681 Mon Sep 17 00:00:00 2001 From: Gregory Tsipenyuk Date: Sat, 15 Feb 2020 10:50:52 -0500 Subject: [PATCH] Add protocol message compression support: * Peers negotiate compression via HTTP Header "X-Offer-Compression: lz4" * Messages greater than 70 bytes and protocol type messages MANIFESTS, ENDPOINTS, TRANSACTION, GET_LEDGER, LEDGER_DATA, GET_OBJECT, and VALIDATORLIST are compressed * If the compressed message is larger than the uncompressed message then the uncompressed message is sent * Compression flag and the compression algorithm type are included in the message header * Only LZ4 block compression is currently supported --- Builds/CMake/RippledCore.cmake | 1 + src/ripple/basics/CompressionAlgorithms.h | 153 +++++++++ src/ripple/core/Config.h | 3 + src/ripple/core/ConfigSections.h | 1 + src/ripple/core/impl/Config.cpp | 3 + src/ripple/overlay/Compression.h | 103 ++++++ src/ripple/overlay/Message.h | 53 ++- src/ripple/overlay/impl/ConnectAttempt.cpp | 6 +- src/ripple/overlay/impl/ConnectAttempt.h | 2 +- src/ripple/overlay/impl/Message.cpp | 134 +++++++- src/ripple/overlay/impl/PeerImp.cpp | 9 +- src/ripple/overlay/impl/PeerImp.h | 6 + src/ripple/overlay/impl/ProtocolMessage.h | 58 +++- src/test/overlay/compression_test.cpp | 376 +++++++++++++++++++++ 14 files changed, 873 insertions(+), 35 deletions(-) create mode 100644 src/ripple/basics/CompressionAlgorithms.h create mode 100644 src/ripple/overlay/Compression.h create mode 100644 src/test/overlay/compression_test.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 35a015d5464..14251e085f5 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -853,6 +853,7 @@ target_sources (rippled PRIVATE src/test/overlay/ProtocolVersion_test.cpp src/test/overlay/cluster_test.cpp src/test/overlay/short_read_test.cpp + src/test/overlay/compression_test.cpp #[===============================[ test sources: subdir: peerfinder diff --git a/src/ripple/basics/CompressionAlgorithms.h b/src/ripple/basics/CompressionAlgorithms.h new file mode 100644 index 00000000000..3cd67c753d8 --- /dev/null +++ b/src/ripple/basics/CompressionAlgorithms.h @@ -0,0 +1,153 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED +#define RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +namespace compression_algorithms { + +/** Convenience wrapper for Throw + * @param message Message to log/throw + */ +inline void doThrow(const char *message) +{ + Throw(message); +} + +/** LZ4 block compression. + * @tparam BufferFactory Callable object or lambda. + * Takes the requested buffer size and returns allocated buffer pointer. + * @param in Data to compress + * @param inSize Size of the data + * @param bf Compressed buffer allocator + * @return Size of compressed data, or zero if failed to compress + */ +template +std::size_t +lz4Compress(void const* in, + std::size_t inSize, BufferFactory&& bf) +{ + if (inSize > UINT32_MAX) + doThrow("lz4 compress: invalid size"); + + auto const outCapacity = LZ4_compressBound(inSize); + + // Request the caller to allocate and return the buffer to hold compressed data + auto compressed = bf(outCapacity); + + auto compressedSize = LZ4_compress_default( + reinterpret_cast(in), + reinterpret_cast(compressed), + inSize, + outCapacity); + if (compressedSize == 0) + doThrow("lz4 compress: failed"); + + return compressedSize; +} + +/** + * @param in Compressed data + * @param inSize Size of compressed data + * @param decompressed Buffer to hold decompressed data + * @param decompressedSize Size of the decompressed buffer + * @return size of the decompressed data + */ +inline +std::size_t +lz4Decompress(std::uint8_t const* in, std::size_t inSize, + std::uint8_t* decompressed, std::size_t decompressedSize) +{ + auto ret = LZ4_decompress_safe(reinterpret_cast(in), + reinterpret_cast(decompressed), inSize, decompressedSize); + + if (ret <= 0 || ret != decompressedSize) + doThrow("lz4 decompress: failed"); + + return decompressedSize; +} + +/** LZ4 block decompression. + * @tparam InputStream ZeroCopyInputStream + * @param in Input source stream + * @param inSize Size of compressed data + * @param decompressed Buffer to hold decompressed data + * @param decompressedSize Size of the decompressed buffer + * @return size of the decompressed data + */ +template +std::size_t +lz4Decompress(InputStream& in, std::size_t inSize, + std::uint8_t* decompressed, std::size_t decompressedSize) +{ + std::vector compressed; + std::uint8_t const* chunk = nullptr; + int chunkSize = 0; + int copiedInSize = 0; + auto const currentBytes = in.ByteCount(); + + // Use the first chunk if it is >= inSize bytes of the compressed message. + // Otherwise copy inSize bytes of chunks into compressed buffer and + // use the buffer to decompress. + while (in.Next(reinterpret_cast(&chunk), &chunkSize)) + { + if (copiedInSize == 0) + { + if (chunkSize >= inSize) + { + copiedInSize = inSize; + break; + } + compressed.resize(inSize); + } + + chunkSize = chunkSize < (inSize - copiedInSize) ? chunkSize : (inSize - copiedInSize); + + std::copy(chunk, chunk + chunkSize, compressed.data() + copiedInSize); + + copiedInSize += chunkSize; + + if (copiedInSize == inSize) + { + chunk = compressed.data(); + break; + } + } + + // Put back unused bytes + if (in.ByteCount() > (currentBytes + copiedInSize)) + in.BackUp(in.ByteCount() - currentBytes - copiedInSize); + + if ((copiedInSize == 0 && chunkSize < inSize) || (copiedInSize > 0 && copiedInSize != inSize)) + doThrow("lz4 decompress: insufficient input size"); + + return lz4Decompress(chunk, inSize, decompressed, decompressedSize); +} + +} // compression + +} // ripple + +#endif //RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index e1200c63cef..5231f169468 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -171,6 +171,9 @@ class Config : public BasicConfig std::string SSL_VERIFY_FILE; std::string SSL_VERIFY_DIR; + // Compression + bool COMPRESSION = false; + // Thread pool configuration std::size_t WORKERS = 0; diff --git a/src/ripple/core/ConfigSections.h b/src/ripple/core/ConfigSections.h index e5f1a3f490e..653b9c404e4 100644 --- a/src/ripple/core/ConfigSections.h +++ b/src/ripple/core/ConfigSections.h @@ -37,6 +37,7 @@ struct ConfigSection // VFALCO TODO Rename and replace these macros with variables. #define SECTION_AMENDMENTS "amendments" #define SECTION_CLUSTER_NODES "cluster_nodes" +#define SECTION_COMPRESSION "compression" #define SECTION_DEBUG_LOGFILE "debug_logfile" #define SECTION_ELB_SUPPORT "elb_support" #define SECTION_FEE_DEFAULT "fee_default" diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index b49bcebffb9..36732800616 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -454,6 +454,9 @@ void Config::loadFromString (std::string const& fileContents) if (getSingleSection (secConfig, SECTION_WORKERS, strTemp, j_)) WORKERS = beast::lexicalCastThrow (strTemp); + if (getSingleSection (secConfig, SECTION_COMPRESSION, strTemp, j_)) + COMPRESSION = beast::lexicalCastThrow (strTemp); + // Do not load trusted validator configuration for standalone mode if (! RUN_STANDALONE) { diff --git a/src/ripple/overlay/Compression.h b/src/ripple/overlay/Compression.h new file mode 100644 index 00000000000..efec6b524e0 --- /dev/null +++ b/src/ripple/overlay/Compression.h @@ -0,0 +1,103 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_COMPRESSION_H_INCLUDED +#define RIPPLED_COMPRESSION_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +namespace compression { + +std::size_t constexpr headerBytes = 6; +std::size_t constexpr headerBytesCompressed = 10; + +enum class Algorithm : std::uint8_t { + None = 0x00, + LZ4 = 0x01 +}; + +enum class Compressed : std::uint8_t { + On, + Off +}; + +/** Decompress input stream. + * @tparam InputStream ZeroCopyInputStream + * @param in Input source stream + * @param inSize Size of compressed data + * @param decompressed Buffer to hold decompressed message + * @param algorithm Compression algorithm type + * @return Size of decompressed data or zero if failed to decompress + */ +template +std::size_t +decompress(InputStream& in, std::size_t inSize, std::uint8_t* decompressed, + std::size_t decompressedSize, Algorithm algorithm = Algorithm::LZ4) { + try + { + if (algorithm == Algorithm::LZ4) + return ripple::compression_algorithms::lz4Decompress(in, inSize, + decompressed, decompressedSize); + else + { + JLOG(debugLog().warn()) << "decompress: invalid compression algorithm " + << static_cast(algorithm); + assert(0); + } + } + catch (...) {} + return 0; +} + +/** Compress input data. + * @tparam BufferFactory Callable object or lambda. + * Takes the requested buffer size and returns allocated buffer pointer. + * @param in Data to compress + * @param inSize Size of the data + * @param bf Compressed buffer allocator + * @param algorithm Compression algorithm type + * @return Size of compressed data, or zero if failed to compress + */ +template +std::size_t +compress(void const* in, + std::size_t inSize, BufferFactory&& bf, Algorithm algorithm = Algorithm::LZ4) { + try + { + if (algorithm == Algorithm::LZ4) + return ripple::compression_algorithms::lz4Compress(in, inSize, std::forward(bf)); + else + { + JLOG(debugLog().warn()) << "compress: invalid compression algorithm" + << static_cast(algorithm); + assert(0); + } + } + catch (...) {} + return 0; +} +} // compression + +} // ripple + +#endif //RIPPLED_COMPRESSION_H_INCLUDED diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index e186272b3bf..b3ae036a500 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED #define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED +#include #include #include #include @@ -47,27 +48,61 @@ namespace ripple { class Message : public std::enable_shared_from_this { + using Compressed = compression::Compressed; + using Algorithm = compression::Algorithm; public: + /** Constructor + * @param message Protocol message to serialize + * @param type Protocol message type + */ Message (::google::protobuf::Message const& message, int type); -public: - /** Retrieve the packed message data. */ + /** Retrieve the packed message data. If compressed message is requested but the message + * is not compressible then the uncompressed buffer is returned. + * @param compressed Request compressed (Compress::On) or + * uncompressed (Compress::Off) payload buffer + * @return Payload buffer + */ std::vector const& - getBuffer () const - { - return mBuffer; - } + getBuffer (Compressed tryCompressed); /** Get the traffic category */ std::size_t getCategory () const { - return mCategory; + return category_; } private: - std::vector mBuffer; - std::size_t mCategory; + std::vector buffer_; + std::vector bufferCompressed_; + std::size_t category_; + std::once_flag once_flag_; + + /** Set the payload header + * @param in Pointer to the payload + * @param payloadBytes Size of the payload excluding the header size + * @param type Protocol message type + * @param comprAlgorithm Compression algorithm used in compression, + * currently LZ4 only. If None then the message is uncompressed. + * @param uncompressedBytes Size of the uncompressed message + */ + void setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type, + Algorithm comprAlgorithm, std::uint32_t uncompressedBytes); + + /** Try to compress the payload. + * Can be called concurrently by multiple peers but is compressed once. + * If the message is not compressible then the serialized buffer_ is used. + */ + void compress(); + + /** Get the message type from the payload header. + * First four bytes are the compression/algorithm flag and the payload size. + * Next two bytes are the message type + * @param in Payload header pointer + * @return Message type + */ + int getType(std::uint8_t const* in) const; }; } diff --git a/src/ripple/overlay/impl/ConnectAttempt.cpp b/src/ripple/overlay/impl/ConnectAttempt.cpp index 2c79e7ccc24..4cd38715fcc 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.cpp +++ b/src/ripple/overlay/impl/ConnectAttempt.cpp @@ -197,7 +197,7 @@ ConnectAttempt::onHandshake (error_code ec) if (! sharedValue) return close(); // makeSharedValue logs - req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate); + req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION); buildHandshake(req_, *sharedValue, overlay_.setup().networkID, overlay_.setup().public_ip, remote_endpoint_.address(), app_); @@ -264,7 +264,7 @@ ConnectAttempt::onShutdown (error_code ec) //-------------------------------------------------------------------------- auto -ConnectAttempt::makeRequest (bool crawl) -> request_type +ConnectAttempt::makeRequest (bool crawl, bool compressionEnabled) -> request_type { request_type m; m.method(boost::beast::http::verb::get); @@ -275,6 +275,8 @@ ConnectAttempt::makeRequest (bool crawl) -> request_type m.insert ("Connection", "Upgrade"); m.insert ("Connect-As", "Peer"); m.insert ("Crawl", crawl ? "public" : "private"); + if (compressionEnabled) + m.insert("X-Offer-Compression", "lz4"); return m; } diff --git a/src/ripple/overlay/impl/ConnectAttempt.h b/src/ripple/overlay/impl/ConnectAttempt.h index 601207de13d..5464ae8cdfd 100644 --- a/src/ripple/overlay/impl/ConnectAttempt.h +++ b/src/ripple/overlay/impl/ConnectAttempt.h @@ -93,7 +93,7 @@ class ConnectAttempt static request_type - makeRequest (bool crawl); + makeRequest (bool crawl, bool compressionEnabled); void processResponse(); diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index acf201e49e9..edac6b41248 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -17,7 +17,6 @@ */ //============================================================================== -#include #include #include #include @@ -25,8 +24,9 @@ namespace ripple { Message::Message (::google::protobuf::Message const& message, int type) - : mCategory(TrafficCount::categorize(message, type, false)) + : category_(TrafficCount::categorize(message, type, false)) { + using namespace ripple::compression; #if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000) auto const messageBytes = message.ByteSizeLong (); @@ -36,23 +36,129 @@ Message::Message (::google::protobuf::Message const& message, int type) assert (messageBytes != 0); - /** Number of bytes in a message header. */ - std::size_t constexpr headerBytes = 6; + buffer_.resize (headerBytes + messageBytes); - mBuffer.resize (headerBytes + messageBytes); + setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0); - auto ptr = mBuffer.data(); + if (messageBytes != 0) + message.SerializeToArray(buffer_.data() + headerBytes, messageBytes); +} + +void +Message::compress() +{ + using namespace ripple::compression; + auto const messageBytes = buffer_.size () - headerBytes; - *ptr++ = static_cast((messageBytes >> 24) & 0xFF); - *ptr++ = static_cast((messageBytes >> 16) & 0xFF); - *ptr++ = static_cast((messageBytes >> 8) & 0xFF); - *ptr++ = static_cast(messageBytes & 0xFF); + auto type = getType(buffer_.data()); - *ptr++ = static_cast((type >> 8) & 0xFF); - *ptr++ = static_cast (type & 0xFF); + bool const compressible = [&]{ + if (messageBytes <= 70) + return false; + switch(type) + { + case protocol::mtMANIFESTS: + case protocol::mtENDPOINTS: + case protocol::mtTRANSACTION: + case protocol::mtGET_LEDGER: + case protocol::mtLEDGER_DATA: + case protocol::mtGET_OBJECTS: + case protocol::mtVALIDATORLIST: + return true; + case protocol::mtPING: + case protocol::mtCLUSTER: + case protocol::mtPROPOSE_LEDGER: + case protocol::mtSTATUS_CHANGE: + case protocol::mtHAVE_SET: + case protocol::mtVALIDATION: + case protocol::mtGET_SHARD_INFO: + case protocol::mtSHARD_INFO: + case protocol::mtGET_PEER_SHARD_INFO: + case protocol::mtPEER_SHARD_INFO: + break; + } + return false; + }(); - if (messageBytes != 0) - message.SerializeToArray(ptr, messageBytes); + if (compressible) + { + auto payload = static_cast(buffer_.data() + headerBytes); + + auto compressedSize = ripple::compression::compress( + payload, + messageBytes, + [&](std::size_t inSize) { // size of required compressed buffer + bufferCompressed_.resize(inSize + headerBytesCompressed); + return (bufferCompressed_.data() + headerBytesCompressed); + }); + + if (compressedSize < (messageBytes - (headerBytesCompressed - headerBytes))) + { + bufferCompressed_.resize(headerBytesCompressed + compressedSize); + setHeader(bufferCompressed_.data(), compressedSize, type, Algorithm::LZ4, messageBytes); + } + else + bufferCompressed_.resize(0); + } +} + +/** Set payload header + * Uncompressed message header + * 47-42 Set to 0 + * 41-16 Payload size + * 15-0 Message Type + * Compressed message header + * 79 Set to 0, indicates the message is compressed + * 78-76 Compression algorithm, value 1-7. Set to 1 to indicate LZ4 compression + * 75-74 Set to 0 + * 73-48 Payload size + * 47-32 Message Type + * 31-0 Uncompressed message size +*/ +void +Message::setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type, + Algorithm comprAlgorithm, std::uint32_t uncompressedBytes) +{ + auto h = in; + + auto pack = [](std::uint8_t*& in, std::uint32_t size) { + *in++ = static_cast((size >> 24) & 0x0F); // leftmost 4 are compression bits + *in++ = static_cast((size >> 16) & 0xFF); + *in++ = static_cast((size >> 8) & 0xFF); + *in++ = static_cast(size & 0xFF); + }; + + pack(in, payloadBytes); + + *in++ = static_cast((type >> 8) & 0xFF); + *in++ = static_cast (type & 0xFF); + + if (comprAlgorithm != Algorithm::None) + { + pack(in, uncompressedBytes); + *h |= 0x80 | (static_cast(comprAlgorithm) << 4); + } +} + +std::vector const& +Message::getBuffer (Compressed tryCompressed) +{ + if (tryCompressed == Compressed::Off) + return buffer_; + + std::call_once(once_flag_, &Message::compress, this); + + if (bufferCompressed_.size() > 0) + return bufferCompressed_; + else + return buffer_; +} + +int +Message::getType(std::uint8_t const* in) const +{ + int type = (static_cast(*(in + 4)) << 8) + *(in + 5); + return type; } } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 122e3dd197c..df64c0da8a8 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -86,6 +86,7 @@ PeerImp::PeerImp (Application& app, id_t id, , slot_ (slot) , request_(std::move(request)) , headers_(request_) + , compressionEnabled_(headers_["X-Offer-Compression"] == "lz4" ? Compressed::On : Compressed::Off) { } @@ -219,7 +220,7 @@ PeerImp::send (std::shared_ptr const& m) overlay_.reportTraffic ( safe_cast(m->getCategory()), - false, static_cast(m->getBuffer().size())); + false, static_cast(m->getBuffer(compressionEnabled_).size())); auto sendq_size = send_queue_.size(); @@ -246,7 +247,7 @@ PeerImp::send (std::shared_ptr const& m) boost::asio::async_write( stream_, - boost::asio::buffer(send_queue_.front()->getBuffer()), + boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)), bind_executor( strand_, std::bind( @@ -757,6 +758,8 @@ PeerImp::makeResponse (bool crawl, resp.insert("Connect-As", "Peer"); resp.insert("Server", BuildInfo::getFullVersionString()); resp.insert("Crawl", crawl ? "public" : "private"); + if (req["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION) + resp.insert("X-Offer-Compression", "lz4"); buildHandshake(resp, sharedValue, overlay_.setup().networkID, overlay_.setup().public_ip, remote_ip, app_); @@ -945,7 +948,7 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) // Timeout on writes only return boost::asio::async_write( stream_, - boost::asio::buffer(send_queue_.front()->getBuffer()), + boost::asio::buffer(send_queue_.front()->getBuffer(compressionEnabled_)), bind_executor( strand_, std::bind( diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 559acb26bc0..0d484517160 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -99,6 +99,7 @@ class PeerImp using address_type = boost::asio::ip::address; using endpoint_type = boost::asio::ip::tcp::endpoint; using waitable_timer = boost::asio::basic_waitable_timer; + using Compressed = compression::Compressed; Application& app_; id_t const id_; @@ -201,6 +202,8 @@ class PeerImp std::mutex mutable shardInfoMutex_; hash_map shardInfo_; + Compressed compressionEnabled_ = Compressed::Off; + friend class OverlayImpl; class Metrics { @@ -600,6 +603,9 @@ PeerImp::PeerImp (Application& app, std::unique_ptr&& stream_ptr, , slot_ (std::move(slot)) , response_(std::move(response)) , headers_(response_) + , compressionEnabled_( + headers_["X-Offer-Compression"] == "lz4" && app_.config().COMPRESSION + ? Compressed::On : Compressed::Off) { read_buffer_.commit (boost::asio::buffer_copy(read_buffer_.prepare( boost::asio::buffer_size(buffers)), buffers)); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 8bc8ef77bb9..be4a93fbb4a 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -81,36 +82,66 @@ struct MessageHeader /** The size of the payload on the wire. */ std::uint32_t payload_wire_size = 0; + /** Uncompressed message size if the message is compressed. */ + std::uint32_t uncompressed_size = 0; + /** The type of the message. */ std::uint16_t message_type = 0; + + /** Indicates which compression algorithm the payload is compressed with. + * Currenly only lz4 is supported. If None then the message is not compressed. + */ + compression::Algorithm algorithm = compression::Algorithm::None; }; +template +auto +buffersBegin(BufferSequence const &bufs) +{ + return boost::asio::buffers_iterator::begin(bufs); +} + template boost::optional parseMessageHeader( BufferSequence const& bufs, std::size_t size) { - auto iter = boost::asio::buffers_iterator::begin(bufs); + using namespace ripple::compression; + auto iter = buffersBegin(bufs); MessageHeader hdr; + auto const compressed = (*iter & 0x80) == 0x80; - // Version 1 header: uncompressed payload. - // The top six bits of the first byte are 0. - if ((*iter & 0xFC) == 0) + // Check valid header + if ((*iter & 0xFC) == 0 || compressed) { - hdr.header_size = 6; + hdr.header_size = compressed ? headerBytesCompressed : headerBytes; if (size < hdr.header_size) return {}; + if (compressed) + { + uint8_t algorithm = (*iter & 0x70) >> 4; + if (algorithm != static_cast(compression::Algorithm::LZ4)) + return {}; + hdr.algorithm = compression::Algorithm::LZ4; + } + for (int i = 0; i != 4; ++i) hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++; + // clear the compression bits + hdr.payload_wire_size &= 0x03FFFFFF; hdr.total_wire_size = hdr.header_size + hdr.payload_wire_size; for (int i = 0; i != 2; ++i) hdr.message_type = (hdr.message_type << 8) + *iter++; + if (compressed) + for (int i = 0; i != 4; ++i) + hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++; + return hdr; } @@ -130,7 +161,22 @@ invoke ( ZeroCopyInputStream stream(buffers); stream.Skip(header.header_size); - if (! m->ParseFromZeroCopyStream(&stream)) + if (header.algorithm != compression::Algorithm::None) + { + std::vector payload; + payload.resize(header.uncompressed_size); + + auto payloadSize = ripple::compression::decompress( + stream, + header.payload_wire_size, + payload.data(), + header.uncompressed_size, + header.algorithm); + + if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize)) + return false; + } + else if (!m->ParseFromZeroCopyStream(&stream)) return false; handler.onMessageBegin (header.message_type, m, header.payload_wire_size); diff --git a/src/test/overlay/compression_test.cpp b/src/test/overlay/compression_test.cpp new file mode 100644 index 00000000000..6d3ba5a9aa5 --- /dev/null +++ b/src/test/overlay/compression_test.cpp @@ -0,0 +1,376 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +namespace test { + +using namespace ripple::test; +using namespace ripple::test::jtx; + +static +uint256 +ledgerHash (LedgerInfo const& info) +{ +return ripple::sha512Half( + HashPrefix::ledgerMaster, + std::uint32_t(info.seq), + std::uint64_t(info.drops.drops ()), + info.parentHash, + info.txHash, + info.accountHash, + std::uint32_t(info.parentCloseTime.time_since_epoch().count()), + std::uint32_t(info.closeTime.time_since_epoch().count()), + std::uint8_t(info.closeTimeResolution.count()), + std::uint8_t(info.closeFlags)); +} + +class compression_test : public beast::unit_test::suite { + using Compressed = compression::Compressed; + using Algorithm = compression::Algorithm; +public: + compression_test() {} + + template + void + doTest(std::shared_ptr proto, protocol::MessageType mt, uint16_t nbuffers, const char *msg, + bool log = false) { + + if (log) + printf("=== compress/decompress %s ===\n", msg); + Message m(*proto, mt); + + auto &buffer = m.getBuffer(Compressed::On); + + if (log) + printf("==> compressed, original %d bytes, compressed %d bytes\n", + (int)m.getBuffer(Compressed::Off).size(), + (int)m.getBuffer(Compressed::On).size()); + + boost::beast::multi_buffer buffers; + + + // simulate multi-buffer + auto sz = buffer.size() / nbuffers; + for (int i = 0; i < nbuffers; i++) { + auto start = buffer.begin() + sz * i; + auto end = i < nbuffers - 1 ? (buffer.begin() + sz * (i + 1)) : buffer.end(); + std::vector slice(start, end); + buffers.commit( + boost::asio::buffer_copy(buffers.prepare(slice.size()), boost::asio::buffer(slice))); + } + auto header = ripple::detail::parseMessageHeader(buffers.data(), buffer.size()); + + if (log) + printf("==> parsed header: buffers size %d, compressed %d, algorithm %d, header size %d, payload size %d, buffer size %d\n", + (int)buffers.size(), header->algorithm != Algorithm::None, (int)header->algorithm, + (int)header->header_size, (int)header->payload_wire_size, (int)buffer.size()); + + if (header->algorithm == Algorithm::None) { + if (log) + printf("==> NOT COMPRESSED\n"); + return; + } + + std::vector decompressed; + decompressed.resize(header->uncompressed_size); + + BEAST_EXPECT(header->payload_wire_size == buffer.size() - header->header_size); + + ZeroCopyInputStream stream(buffers.data()); + stream.Skip(header->header_size); + + auto decompressedSize = ripple::compression::decompress(stream, header->payload_wire_size, + decompressed.data(), header->uncompressed_size); + BEAST_EXPECT(decompressedSize == header->uncompressed_size); + auto const proto1 = std::make_shared(); + + BEAST_EXPECT(proto1->ParseFromArray(decompressed.data(), decompressedSize)); + auto uncompressed = m.getBuffer(Compressed::Off); + BEAST_EXPECT(std::equal(uncompressed.begin() + ripple::compression::headerBytes, + uncompressed.end(), + decompressed.begin())); + if (log) + printf("\n"); + } + + std::shared_ptr + buildManifests(int n) { + auto manifests = std::make_shared(); + manifests->mutable_list()->Reserve(n); + for (int i = 0; i < n; i++) { + auto master = randomKeyPair(KeyType::ed25519); + auto signing = randomKeyPair(KeyType::ed25519); + STObject st(sfGeneric); + st[sfSequence] = i; + st[sfPublicKey] = std::get<0>(master); + st[sfSigningPubKey] = std::get<0>(signing); + st[sfDomain] = makeSlice(std::string("example") + std::to_string(i) + std::string(".com")); + sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(master), sfMasterSignature); + sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing)); + Serializer s; + st.add(s); + auto *manifest = manifests->add_list(); + manifest->set_stobject(s.data(), s.size()); + } + return manifests; + } + + std::shared_ptr + buildEndpoints(int n) { + auto endpoints = std::make_shared(); + endpoints->mutable_endpoints()->Reserve(n); + for (int i = 0; i < n; i++) { + auto *endpoint = endpoints->add_endpoints(); + endpoint->set_hops(i); + std::string addr = std::string("10.0.1.") + std::to_string(i); + endpoint->mutable_ipv4()->set_ipv4( + boost::endian::native_to_big(boost::asio::ip::address_v4::from_string(addr).to_uint())); + endpoint->mutable_ipv4()->set_ipv4port(i); + } + endpoints->set_version(2); + + return endpoints; + } + + std::shared_ptr + buildTransaction(Logs &logs) { + Env env(*this, envconfig()); + int fund = 10000; + auto const alice = Account("alice"); + auto const bob = Account("bob"); + env.fund(XRP(fund), "alice", "bob"); + env.trust(bob["USD"](fund), alice); + env.close(); + + auto toBinary = [](std::string const &text) { + std::string binary; + for (size_t i = 0; i < text.size(); ++i) { + unsigned int c = charUnHex(text[i]); + c = c << 4; + ++i; + c = c | charUnHex(text[i]); + binary.push_back(c); + } + + return binary; + }; + + std::string usdTxBlob = ""; + auto wsc = makeWSClient(env.app().config()); + { + Json::Value jrequestUsd; + jrequestUsd[jss::secret] = toBase58(generateSeed("bob")); + jrequestUsd[jss::tx_json] = + pay("bob", "alice", bob["USD"](fund / 2)); + Json::Value jreply_usd = wsc->invoke("sign", jrequestUsd); + + usdTxBlob = + toBinary(jreply_usd[jss::result][jss::tx_blob].asString()); + } + + auto transaction = std::make_shared(); + transaction->set_rawtransaction(usdTxBlob); + transaction->set_status(protocol::tsNEW); + auto tk = make_TimeKeeper(logs.journal("TimeKeeper")); + transaction->set_receivetimestamp(tk->now().time_since_epoch().count()); + transaction->set_deferred(true); + + return transaction; + } + + std::shared_ptr + buildGetLedger() { + auto getLedger = std::make_shared(); + getLedger->set_itype(protocol::liTS_CANDIDATE); + getLedger->set_ltype(protocol::TMLedgerType::ltACCEPTED); + uint256 const hash(ripple::sha512Half(123456789)); + getLedger->set_ledgerhash(hash.begin(), hash.size()); + getLedger->set_ledgerseq(123456789); + ripple::SHAMapNodeID sha(hash.data(), hash.size()); + getLedger->add_nodeids(sha.getRawString()); + getLedger->set_requestcookie(123456789); + getLedger->set_querytype(protocol::qtINDIRECT); + getLedger->set_querydepth(3); + return getLedger; + } + + std::shared_ptr + buildLedgerData(uint32_t n, Logs &logs) { + auto ledgerData = std::make_shared(); + uint256 const hash(ripple::sha512Half(12356789)); + ledgerData->set_ledgerhash(hash.data(), hash.size()); + ledgerData->set_ledgerseq(123456789); + ledgerData->set_type(protocol::TMLedgerInfoType::liAS_NODE); + ledgerData->set_requestcookie(123456789); + ledgerData->set_error(protocol::TMReplyError::reNO_LEDGER); + ledgerData->mutable_nodes()->Reserve(n); + uint256 parentHash(0); + for (int i = 0; i < n; i++) { + LedgerInfo info; + auto tk = make_TimeKeeper(logs.journal("TimeKeeper")); + info.seq = i; + info.parentCloseTime = tk->now(); + info.hash = ripple::sha512Half(i); + info.txHash = ripple::sha512Half(i + 1); + info.accountHash = ripple::sha512Half(i + 2); + info.parentHash = parentHash; + info.drops = XRPAmount(10); + info.closeTimeResolution = tk->now().time_since_epoch(); + info.closeTime = tk->now(); + parentHash = ledgerHash(info); + Serializer nData; + ripple::addRaw(info, nData); + ledgerData->add_nodes()->set_nodedata(nData.getDataPtr(), nData.getLength()); + } + + return ledgerData; + } + + std::shared_ptr + buildGetObjectByHash() { + auto getObject = std::make_shared(); + + getObject->set_type(protocol::TMGetObjectByHash_ObjectType::TMGetObjectByHash_ObjectType_otTRANSACTION); + getObject->set_query(true); + getObject->set_seq(123456789); + uint256 hash(ripple::sha512Half(123456789)); + getObject->set_ledgerhash(hash.data(), hash.size()); + getObject->set_fat(true); + for (int i = 0; i < 100; i++) { + uint256 hash(ripple::sha512Half(i)); + auto object = getObject->add_objects(); + object->set_hash(hash.data(), hash.size()); + ripple::SHAMapNodeID sha(hash.data(), hash.size()); + object->set_nodeid(sha.getRawString()); + object->set_index(""); + object->set_data(""); + object->set_ledgerseq(i); + } + return getObject; + } + + std::shared_ptr + buildValidatorList() + { + auto list = std::make_shared(); + + auto master = randomKeyPair(KeyType::ed25519); + auto signing = randomKeyPair(KeyType::ed25519); + STObject st(sfGeneric); + st[sfSequence] = 0; + st[sfPublicKey] = std::get<0>(master); + st[sfSigningPubKey] = std::get<0>(signing); + st[sfDomain] = makeSlice(std::string("example.com")); + sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(master), sfMasterSignature); + sign(st, HashPrefix::manifest, KeyType::ed25519, std::get<1>(signing)); + Serializer s; + st.add(s); + list->set_manifest(s.data(), s.size()); + list->set_version(3); + STObject signature(sfSignature); + ripple::sign(st, HashPrefix::manifest,KeyType::ed25519, std::get<1>(signing)); + Serializer s1; + st.add(s1); + list->set_signature(s1.data(), s1.size()); + list->set_blob(strHex(s.getString())); + return list; + } + + void + testProtocol() { + testcase("Message Compression"); + + auto thresh = beast::severities::Severity::kInfo; + auto logs = std::make_unique(thresh); + + protocol::TMManifests manifests; + protocol::TMEndpoints endpoints; + protocol::TMTransaction transaction; + protocol::TMGetLedger get_ledger; + protocol::TMLedgerData ledger_data; + protocol::TMGetObjectByHash get_object; + protocol::TMValidatorList validator_list; + + // 4.5KB + doTest(buildManifests(20), protocol::mtMANIFESTS, 4, "TMManifests20"); + // 22KB + doTest(buildManifests(100), protocol::mtMANIFESTS, 4, "TMManifests100"); + // 131B + doTest(buildEndpoints(10), protocol::mtENDPOINTS, 4, "TMEndpoints10"); + // 1.3KB + doTest(buildEndpoints(100), protocol::mtENDPOINTS, 4, "TMEndpoints100"); + // 242B + doTest(buildTransaction(*logs), protocol::mtTRANSACTION, 1, "TMTransaction"); + // 87B + doTest(buildGetLedger(), protocol::mtGET_LEDGER, 1, "TMGetLedger"); + // 61KB + doTest(buildLedgerData(500, *logs), protocol::mtLEDGER_DATA, 10, "TMLedgerData500"); + // 122 KB + doTest(buildLedgerData(1000, *logs), protocol::mtLEDGER_DATA, 20, "TMLedgerData1000"); + // 1.2MB + doTest(buildLedgerData(10000, *logs), protocol::mtLEDGER_DATA, 50, "TMLedgerData10000"); + // 12MB + doTest(buildLedgerData(100000, *logs), protocol::mtLEDGER_DATA, 100, "TMLedgerData100000"); + // 61MB + doTest(buildLedgerData(500000, *logs), protocol::mtLEDGER_DATA, 100, "TMLedgerData500000"); + // 7.7KB + doTest(buildGetObjectByHash(), protocol::mtGET_OBJECTS, 4, "TMGetObjectByHash"); + // 895B + doTest(buildValidatorList(), protocol::mtVALIDATORLIST, 4, "TMValidatorList"); + } + + void run() override { + testProtocol(); + } + +}; + +BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(compression, ripple_data, ripple, 20); + +} +} \ No newline at end of file