Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol message compression support #3287

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ target_sources (rippled PRIVATE
src/test/overlay/ProtocolVersion_test.cpp
src/test/overlay/cluster_test.cpp
src/test/overlay/short_read_test.cpp
src/test/overlay/compression_test.cpp
#[===============================[
test sources:
subdir: peerfinder
Expand Down
153 changes: 153 additions & 0 deletions src/ripple/basics/CompressionAlgorithms.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
#define RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED

#include <ripple/basics/contract.h>
seelabs marked this conversation as resolved.
Show resolved Hide resolved
#include <lz4.h>
#include <algorithm>

namespace ripple {

namespace compression_algorithms {

/** Convenience wrapper for Throw
* @param message Message to log/throw
*/
inline void doThrow(const char *message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is invoked from multiple places and I don't know that they're all from within a try/catch block, or that the catch is appropriately scoped. We need to be careful when throwing to ensure that not only will the exception be caught but that continuing after the handler doesn't cause weirdness.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added try/catch in Compression.h module.

{
Throw<std::runtime_error>(message);
}

/** LZ4 block compression.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @return Size of compressed data, or zero if failed to compress
*/
template<typename BufferFactory>
std::size_t
lz4Compress(void const* in,
std::size_t inSize, BufferFactory&& bf)
{
if (inSize > UINT32_MAX)
doThrow("lz4 compress: invalid size");

auto const outCapacity = LZ4_compressBound(inSize);

// Request the caller to allocate and return the buffer to hold compressed data
auto compressed = bf(outCapacity);

auto compressedSize = LZ4_compress_default(
reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(compressed),
inSize,
outCapacity);
if (compressedSize == 0)
doThrow("lz4 compress: failed");

return compressedSize;
}

/**
* @param in Compressed data
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
inline
std::size_t
lz4Decompress(std::uint8_t const* in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in),
reinterpret_cast<char*>(decompressed), inSize, decompressedSize);

if (ret <= 0 || ret != decompressedSize)
doThrow("lz4 decompress: failed");

return decompressedSize;
}

/** LZ4 block decompression.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed data
* @param decompressedSize Size of the decompressed buffer
* @return size of the decompressed data
*/
template<typename InputStream>
std::size_t
lz4Decompress(InputStream& in, std::size_t inSize,
std::uint8_t* decompressed, std::size_t decompressedSize)
{
std::vector<std::uint8_t> compressed;
std::uint8_t const* chunk = nullptr;
int chunkSize = 0;
int copiedInSize = 0;
auto const currentBytes = in.ByteCount();

// Use the first chunk if it is >= inSize bytes of the compressed message.
// Otherwise copy inSize bytes of chunks into compressed buffer and
// use the buffer to decompress.
while (in.Next(reinterpret_cast<void const**>(&chunk), &chunkSize))
{
if (copiedInSize == 0)
{
if (chunkSize >= inSize)
{
copiedInSize = inSize;
break;
}
compressed.resize(inSize);
}

chunkSize = chunkSize < (inSize - copiedInSize) ? chunkSize : (inSize - copiedInSize);

std::copy(chunk, chunk + chunkSize, compressed.data() + copiedInSize);

copiedInSize += chunkSize;

if (copiedInSize == inSize)
{
seelabs marked this conversation as resolved.
Show resolved Hide resolved
chunk = compressed.data();
break;
}
}

// Put back unused bytes
if (in.ByteCount() > (currentBytes + copiedInSize))
in.BackUp(in.ByteCount() - currentBytes - copiedInSize);

if ((copiedInSize == 0 && chunkSize < inSize) || (copiedInSize > 0 && copiedInSize != inSize))
doThrow("lz4 decompress: insufficient input size");

return lz4Decompress(chunk, inSize, decompressed, decompressedSize);
}

} // compression

} // ripple

#endif //RIPPLED_COMPRESSIONALGORITHMS_H_INCLUDED
3 changes: 3 additions & 0 deletions src/ripple/core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class Config : public BasicConfig
std::string SSL_VERIFY_FILE;
std::string SSL_VERIFY_DIR;

// Compression
bool COMPRESSION = false;

// Thread pool configuration
std::size_t WORKERS = 0;

Expand Down
1 change: 1 addition & 0 deletions src/ripple/core/ConfigSections.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ConfigSection
// VFALCO TODO Rename and replace these macros with variables.
#define SECTION_AMENDMENTS "amendments"
#define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default"
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/core/impl/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ void Config::loadFromString (std::string const& fileContents)
if (getSingleSection (secConfig, SECTION_WORKERS, strTemp, j_))
WORKERS = beast::lexicalCastThrow <std::size_t> (strTemp);

if (getSingleSection (secConfig, SECTION_COMPRESSION, strTemp, j_))
COMPRESSION = beast::lexicalCastThrow <bool> (strTemp);

// Do not load trusted validator configuration for standalone mode
if (! RUN_STANDALONE)
{
Expand Down
103 changes: 103 additions & 0 deletions src/ripple/overlay/Compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLED_COMPRESSION_H_INCLUDED
#define RIPPLED_COMPRESSION_H_INCLUDED

#include <ripple/basics/CompressionAlgorithms.h>
#include <ripple/basics/Log.h>
#include <lz4frame.h>

namespace ripple {

namespace compression {

std::size_t constexpr headerBytes = 6;
std::size_t constexpr headerBytesCompressed = 10;

enum class Algorithm : std::uint8_t {
None = 0x00,
LZ4 = 0x01
};

enum class Compressed : std::uint8_t {
On,
Off
};

/** Decompress input stream.
* @tparam InputStream ZeroCopyInputStream
* @param in Input source stream
* @param inSize Size of compressed data
* @param decompressed Buffer to hold decompressed message
* @param algorithm Compression algorithm type
* @return Size of decompressed data or zero if failed to decompress
*/
template<typename InputStream>
std::size_t
decompress(InputStream& in, std::size_t inSize, std::uint8_t* decompressed,
std::size_t decompressedSize, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Decompress(in, inSize,
decompressed, decompressedSize);
else
{
JLOG(debugLog().warn()) << "decompress: invalid compression algorithm "
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}

/** Compress input data.
* @tparam BufferFactory Callable object or lambda.
* Takes the requested buffer size and returns allocated buffer pointer.
* @param in Data to compress
* @param inSize Size of the data
* @param bf Compressed buffer allocator
* @param algorithm Compression algorithm type
* @return Size of compressed data, or zero if failed to compress
*/
template<class BufferFactory>
std::size_t
compress(void const* in,
std::size_t inSize, BufferFactory&& bf, Algorithm algorithm = Algorithm::LZ4) {
try
{
if (algorithm == Algorithm::LZ4)
return ripple::compression_algorithms::lz4Compress(in, inSize, std::forward<BufferFactory>(bf));
else
{
JLOG(debugLog().warn()) << "compress: invalid compression algorithm"
<< static_cast<int>(algorithm);
assert(0);
}
}
catch (...) {}
return 0;
}
} // compression

} // ripple

#endif //RIPPLED_COMPRESSION_H_INCLUDED
53 changes: 44 additions & 9 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_MESSAGE_H_INCLUDED
#define RIPPLE_OVERLAY_MESSAGE_H_INCLUDED

#include <ripple/overlay/Compression.h>
#include <ripple/protocol/messages.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffers_iterator.hpp>
Expand Down Expand Up @@ -47,27 +48,61 @@ namespace ripple {

class Message : public std::enable_shared_from_this <Message>
{
using Compressed = compression::Compressed;
using Algorithm = compression::Algorithm;
public:
/** Constructor
* @param message Protocol message to serialize
* @param type Protocol message type
*/
Message (::google::protobuf::Message const& message, int type);

public:
/** Retrieve the packed message data. */
/** Retrieve the packed message data. If compressed message is requested but the message
* is not compressible then the uncompressed buffer is returned.
* @param compressed Request compressed (Compress::On) or
* uncompressed (Compress::Off) payload buffer
* @return Payload buffer
*/
std::vector <uint8_t> const&
getBuffer () const
{
return mBuffer;
}
getBuffer (Compressed tryCompressed);

/** Get the traffic category */
std::size_t
getCategory () const
{
return mCategory;
return category_;
}

private:
std::vector <uint8_t> mBuffer;
std::size_t mCategory;
std::vector <uint8_t> buffer_;
std::vector <uint8_t> bufferCompressed_;
std::size_t category_;
std::once_flag once_flag_;

/** Set the payload header
* @param in Pointer to the payload
* @param payloadBytes Size of the payload excluding the header size
* @param type Protocol message type
* @param comprAlgorithm Compression algorithm used in compression,
* currently LZ4 only. If None then the message is uncompressed.
* @param uncompressedBytes Size of the uncompressed message
*/
void setHeader(std::uint8_t* in, std::uint32_t payloadBytes, int type,
Algorithm comprAlgorithm, std::uint32_t uncompressedBytes);

/** Try to compress the payload.
* Can be called concurrently by multiple peers but is compressed once.
* If the message is not compressible then the serialized buffer_ is used.
*/
void compress();

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int getType(std::uint8_t const* in) const;
};

}
Expand Down
6 changes: 4 additions & 2 deletions src/ripple/overlay/impl/ConnectAttempt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ ConnectAttempt::onHandshake (error_code ec)
if (! sharedValue)
return close(); // makeSharedValue logs

req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate);
req_ = makeRequest(!overlay_.peerFinder().config().peerPrivate, app_.config().COMPRESSION);

buildHandshake(req_, *sharedValue, overlay_.setup().networkID,
overlay_.setup().public_ip, remote_endpoint_.address(), app_);
Expand Down Expand Up @@ -264,7 +264,7 @@ ConnectAttempt::onShutdown (error_code ec)
//--------------------------------------------------------------------------

auto
ConnectAttempt::makeRequest (bool crawl) -> request_type
ConnectAttempt::makeRequest (bool crawl, bool compressionEnabled) -> request_type
{
request_type m;
m.method(boost::beast::http::verb::get);
Expand All @@ -275,6 +275,8 @@ ConnectAttempt::makeRequest (bool crawl) -> request_type
m.insert ("Connection", "Upgrade");
m.insert ("Connect-As", "Peer");
m.insert ("Crawl", crawl ? "public" : "private");
if (compressionEnabled)
m.insert("X-Offer-Compression", "lz4");
return m;
}

Expand Down
Loading