Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: improve read reservations to efficiently handle multiple slices #14054

Merged
merged 59 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
9c5199f
buffer: improve read reservations to efficiently handle multiple slices
ggreenway Oct 28, 2020
246f167
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Nov 30, 2020
ae815f1
honor high watermark
ggreenway Dec 2, 2020
6adeeab
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Dec 4, 2020
6af8023
fix test expectation
ggreenway Dec 4, 2020
c8c9cce
fix incorrect merge conflict resolution
ggreenway Dec 4, 2020
a0b68b7
remove commented out code
ggreenway Dec 4, 2020
ff13426
Remove old reserve/commit API; migrate all uses to new API
ggreenway Dec 7, 2020
c0661eb
don't increase size of preferred length in watermark code trying to
ggreenway Dec 8, 2020
0416701
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Dec 10, 2020
6e9f1aa
api comments; delete old code
ggreenway Dec 10, 2020
eb3fe0e
readability cleanup
ggreenway Dec 11, 2020
aff7e53
change interface to have a different Reservation type for single-slic…
ggreenway Dec 14, 2020
508d4dc
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Dec 15, 2020
abcbe9e
build fixes
ggreenway Dec 16, 2020
be6a9a3
fix watermark buffer reservation logic
ggreenway Dec 16, 2020
11a4362
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Dec 21, 2020
532edbb
remove accidental addition to setup_clang.sh
ggreenway Dec 21, 2020
53fbc33
fixes
ggreenway Dec 21, 2020
4bef236
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Jan 14, 2021
efd8057
release note
ggreenway Jan 14, 2021
75278b6
fix merge conflict
ggreenway Jan 14, 2021
e56585c
clang_tidy
ggreenway Jan 14, 2021
fc7b592
fix mac build
ggreenway Jan 14, 2021
45e7353
Merge remote-tracking branch 'upstream/master' into multi-slice-read
ggreenway Jan 14, 2021
5885132
one more ASSERT
ggreenway Jan 19, 2021
675cd59
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Jan 19, 2021
a1e6c3e
remove cast to fix compilation; the types matched exactly without the…
ggreenway Jan 21, 2021
0e80aca
fix tls throughput benchmark correctness
ggreenway Jan 21, 2021
a5deffa
undo change to connection impl test buffer size
ggreenway Jan 21, 2021
dc4d618
ensure reservation is the desired size
ggreenway Jan 21, 2021
7bd544a
comment in test regarding when slices are added, and additional
ggreenway Jan 21, 2021
4e0331d
add expectation on reservation length
ggreenway Jan 21, 2021
b4c8e46
rework API to not take a param for reservation size for reads
ggreenway Jan 21, 2021
112e3bd
reset length_ on commit()
ggreenway Jan 21, 2021
6195334
check for double-commit()
ggreenway Jan 21, 2021
3475f1d
remove assert that isn't true in fuzz corpus
ggreenway Jan 21, 2021
f8d2a72
ensure we don't overflow the inline-vector in Reservation
ggreenway Jan 21, 2021
01e6ebd
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Jan 21, 2021
1e20eeb
fix buffer_fuzz
ggreenway Jan 22, 2021
d3559f8
reserveWithMaxLength
ggreenway Jan 22, 2021
54d0162
add test for ENVOY_BUG
ggreenway Jan 22, 2021
c837704
make IoHandle::read() max_length optional
ggreenway Jan 22, 2021
530e045
Add test for freelist; fixes to make it work like a stack
ggreenway Jan 22, 2021
873f6be
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Jan 25, 2021
0c92529
fix ordering of release notes (auto-merge got wrong order)
ggreenway Jan 25, 2021
2cfd939
fix spelling
ggreenway Jan 25, 2021
0c26ceb
slightly improve speed_test
ggreenway Jan 26, 2021
07a9314
only allocate one block for impl to track ownership in reservation,
ggreenway Jan 26, 2021
6bc1fc3
fix build
ggreenway Jan 26, 2021
9f3a717
optimize: only lookup thread_local once per reservation
ggreenway Jan 26, 2021
ef66dc4
fix test
ggreenway Jan 26, 2021
47d7cfd
clang-tidy
ggreenway Jan 26, 2021
ea5e975
minor cleanup of `len_` handling
ggreenway Jan 26, 2021
4a92b7d
more expectations in ReserveZeroCommit
ggreenway Jan 27, 2021
97bd65b
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Jan 27, 2021
7b94c6f
Remove a test that no longer makes sense.
ggreenway Jan 27, 2021
180ab4b
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Feb 1, 2021
875e8d3
Merge remote-tracking branch 'upstream/main' into multi-slice-read
ggreenway Feb 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class SliceData {

using SliceDataPtr = std::unique_ptr<SliceData>;

class Reservation;

/**
* A basic buffer abstraction.
*/
Expand Down Expand Up @@ -210,6 +212,9 @@ class Instance {
*/
virtual uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) PURE;

virtual Reservation reserve(uint64_t preferred_length) PURE;
virtual void commit(Reservation& reservation, uint64_t length) PURE;

/**
* Search for an occurrence of data within the buffer.
* @param data supplies the data to search for.
Expand Down Expand Up @@ -420,5 +425,36 @@ class WatermarkFactory {

using WatermarkFactoryPtr = std::unique_ptr<WatermarkFactory>;

/**
* Holds an in-progress addition to a buffer.
*
* @note For performance reasons, this class is passed by value to
* avoid an extra allocation, so it cannot have any virtual methods.
*/
class Reservation final {
public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the default move constructor work for this class?

I looked at the implementation of absl::InlinedVector's move constructor and I see it sets the size of the original to 0 in cases where inlined storage is used (good).
The buffer_ and length_ are still set to their original values after the move. I guess that's probably fine.

Reservation(Reservation&&) = default;
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
~Reservation() {
if (!slices_.empty()) {
buffer_.commit(*this, 0);
}
}
RawSlice* slices() { return slices_.data(); }
uint64_t numSlices() const { return slices_.size(); }
void commit(uint64_t length) {
antoniovicente marked this conversation as resolved.
Show resolved Hide resolved
buffer_.commit(*this, length);
slices_.clear();
owned_slices_.clear();
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
}

// private:
friend class Instance;
Reservation(Instance& buffer) : buffer_(buffer) {}
Instance& buffer_;
static constexpr uint32_t num_elements_ = 9;
absl::InlinedVector<RawSlice, num_elements_> slices_;
absl::InlinedVector<SliceDataPtr, num_elements_> owned_slices_;
};

} // namespace Buffer
} // namespace Envoy
66 changes: 66 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace {
constexpr uint64_t CopyThreshold = 512;
} // namespace

thread_local absl::InlinedVector<std::unique_ptr<OwnedSlice>, OwnedSlice::free_list_max_>
OwnedSlice::free_list_;

void OwnedImpl::addImpl(const void* data, uint64_t size) {
const char* src = static_cast<const char*>(data);
bool new_slice_needed = slices_.empty();
Expand Down Expand Up @@ -389,6 +392,69 @@ uint64_t OwnedImpl::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iove
return num_slices_used;
}

Reservation OwnedImpl::reserve(uint64_t length) {
Reservation reservation(*this);
if (length == 0) {
return reservation;
}

// Remove any empty slices at the end.
while (!slices_.empty() && slices_.back()->dataSize() == 0) {
slices_.pop_back();
}

uint64_t bytes_remaining = length;

// Check whether there are any empty slices with reservable space at the end of the buffer.
uint64_t reservable_size = slices_.empty() ? 0 : slices_.back()->reservableSize();
if (reservable_size >= length || reservable_size >= (OwnedSlice::default_slice_size_ / 8)) {
auto& last_slice = slices_.back();
const uint64_t reservation_size = std::min(last_slice->reservableSize(), bytes_remaining);
auto slice = last_slice->reserve(reservation_size);
reservation.slices_.push_back(slice);
reservation.owned_slices_.push_back(nullptr);
bytes_remaining -= slice.len_;
}

while (bytes_remaining != 0) {
const uint64_t size = OwnedSlice::default_slice_size_;
auto slice = OwnedSlice::create(size);
reservation.slices_.push_back(slice->reserve(size));
reservation.owned_slices_.emplace_back(std::move(slice));
bytes_remaining -= std::min(reservation.slices_.back().len_, bytes_remaining);
}

ASSERT(reservation.slices_.size() <= reservation.num_elements_,
"The absl::InlineVector should be sized so that out-of-line storage isn't needed.");
ASSERT(reservation.slices_.size() == reservation.owned_slices_.size());
return reservation;
}

void OwnedImpl::commit(Reservation& reservation, uint64_t length) {
ASSERT(reservation.slices_.size() == reservation.owned_slices_.size());
uint64_t bytes_remaining = length;
for (uint32_t i = 0; i < reservation.slices_.size(); i++) {
ASSERT((reservation.owned_slices_[i] != nullptr) ==
(dynamic_cast<OwnedSlice*>(reservation.owned_slices_[i].get()) != nullptr));
std::unique_ptr<OwnedSlice> owned_slice(
static_cast<OwnedSlice*>(reservation.owned_slices_[i].release()));

if (bytes_remaining > 0) {
if (owned_slice != nullptr) {
slices_.emplace_back(std::move(owned_slice));
}
antoniovicente marked this conversation as resolved.
Show resolved Hide resolved
reservation.slices_[i].len_ =
std::min<uint64_t>(reservation.slices_[i].len_, bytes_remaining);
bool success = slices_.back()->commit(reservation.slices_[i]);
ASSERT(success);
length_ += reservation.slices_[i].len_;
bytes_remaining -= reservation.slices_[i].len_;
} else {
OwnedSlice::free(std::move(owned_slice));
}
}
}

ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start, size_t length) const {
// This implementation uses the same search algorithm as evbuffer_search(), a naive
// scan that requires O(M*N) comparisons in the worst case.
Expand Down
38 changes: 34 additions & 4 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,18 @@ class OwnedSlice final : public Slice {
* @param capacity number of bytes of space the slice should have.
* @return an OwnedSlice with at least the specified capacity.
*/
static SlicePtr create(uint64_t capacity) {
static std::unique_ptr<OwnedSlice> create(uint64_t capacity) {
ASSERT(sliceSize(default_slice_size_) == default_slice_size_);

uint64_t slice_capacity = sliceSize(capacity);
return SlicePtr{new OwnedSlice(slice_capacity)};
std::unique_ptr<OwnedSlice> slice;
if (slice_capacity == default_slice_size_ && !free_list_.empty()) {
slice = std::move(free_list_.back());
free_list_.pop_back();
} else {
slice.reset(new OwnedSlice(slice_capacity));
}
return slice;
}

/**
Expand All @@ -278,6 +287,22 @@ class OwnedSlice final : public Slice {
return slice;
}

static constexpr uint32_t default_slice_size_ = 16384;

static void free(std::unique_ptr<OwnedSlice> slice) {
if (slice == nullptr) {
return;
}

if (slice->capacity_ == default_slice_size_ && slice->reservable_ == 0 &&
free_list_.size() < free_list_max_) {
free_list_.emplace_back(std::move(slice));
ASSERT(slice == nullptr);
} else {
slice.reset();
}
}

private:
OwnedSlice(uint64_t size) : Slice(0, 0, size), storage_(new uint8_t[size]) {
base_ = storage_.get();
Expand All @@ -290,12 +315,15 @@ class OwnedSlice final : public Slice {
* @param data_size the minimum amount of data the slice must be able to store, in bytes.
* @return a recommended slice size, in bytes.
*/
static uint64_t sliceSize(uint64_t data_size) {
static constexpr uint64_t PageSize = 4096;
constexpr static uint64_t sliceSize(uint64_t data_size) {
constexpr uint64_t PageSize = 4096;
const uint64_t num_pages = (data_size + PageSize - 1) / PageSize;
return num_pages * PageSize;
}

static constexpr uint32_t free_list_max_ = 8;
static thread_local absl::InlinedVector<std::unique_ptr<OwnedSlice>, free_list_max_> free_list_;

std::unique_ptr<uint8_t[]> storage_;
};

Expand Down Expand Up @@ -560,6 +588,7 @@ class OwnedImpl : public LibEventInstance {
void prepend(absl::string_view data) override;
void prepend(Instance& data) override;
void commit(RawSlice* iovecs, uint64_t num_iovecs) override;
void commit(Reservation& reservation, uint64_t length) override;
void copyOut(size_t start, uint64_t size, void* data) const override;
void drain(uint64_t size) override;
RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
Expand All @@ -570,6 +599,7 @@ class OwnedImpl : public LibEventInstance {
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
Reservation reserve(uint64_t preferred_length) override;
ssize_t search(const void* data, uint64_t size, size_t start, size_t length) const override;
bool startsWith(absl::string_view data) const override;
std::string toString() const override;
Expand Down
28 changes: 28 additions & 0 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ void WatermarkBuffer::commit(RawSlice* iovecs, uint64_t num_iovecs) {
checkHighAndOverflowWatermarks();
}

void WatermarkBuffer::commit(Reservation& reservation, uint64_t length) {
OwnedImpl::commit(reservation, length);
checkHighAndOverflowWatermarks();
}

void WatermarkBuffer::drain(uint64_t size) {
OwnedImpl::drain(size);
checkLowWatermark();
Expand All @@ -57,6 +62,29 @@ SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
return result;
}

// Adjust the reservation size based on space available before hitting
// the high watermark to avoid overshooting by a lot and thus violating the limits
// the watermark is imposing.
Reservation WatermarkBuffer::reserve(uint64_t preferred_length) {
uint64_t adjusted_length = preferred_length;

if (high_watermark_ > 0 && preferred_length > 0) {
const uint64_t current_length = OwnedImpl::length();
if (current_length >= high_watermark_) {
// Always allow a read of at least some data. The API doesn't allow returning
// a zero-length reservation.
adjusted_length = OwnedSlice::default_slice_size_;
} else {
const uint64_t available_length = high_watermark_ - current_length;
antoniovicente marked this conversation as resolved.
Show resolved Hide resolved
adjusted_length = std::min(available_length, preferred_length);
adjusted_length =
IntUtil::roundUpToMultiple(adjusted_length, OwnedSlice::default_slice_size_);
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
}
}

return OwnedImpl::reserve(adjusted_length);
}

uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) {
uint64_t bytes_reserved = OwnedImpl::reserve(length, iovecs, num_iovecs);
checkHighAndOverflowWatermarks();
Expand Down
2 changes: 2 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class WatermarkBuffer : public OwnedImpl {
void move(Instance& rhs, uint64_t length) override;
SliceDataPtr extractMutableFrontSlice() override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
Reservation reserve(uint64_t preferred_length) override;
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
void commit(Reservation& reservation, uint64_t length) override;
void postProcess() override { checkLowWatermark(); }
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;
Expand Down
22 changes: 20 additions & 2 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ class DateUtil {
static uint64_t nowToMilliseconds(TimeSource& time_source);
};

/**
* Utility routines for working with integers.
*/
class IntUtil {
public:
/**
* Round `val` up to the next multiple. Examples:
* roundUpToMultiple(3, 8) -> 8
* roundUpToMultiple(9, 8) -> 16
* roundUpToMultiple(8, 8) -> 8
*/
static uint64_t roundUpToMultiple(uint64_t val, uint32_t multiple) {
ASSERT(multiple > 0);
ASSERT((val + multiple) >= val, "Unsigned overflow");
return ((val + multiple - 1) / multiple) * multiple;
}
};

/**
* Utility routines for working with strings.
*/
Expand Down Expand Up @@ -619,8 +637,8 @@ template <class Value> struct TrieLookupTable {
}

/**
* Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, key
* length))
* Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix,
* key length)).
* @param key the key used to find.
* @return the value matching the longest prefix based on the key.
*/
Expand Down
12 changes: 3 additions & 9 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,11 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint6
if (max_length == 0) {
return Api::ioCallUint64ResultNoError();
}
constexpr uint64_t MaxSlices = 2;
Buffer::RawSlice slices[MaxSlices];
const uint64_t num_slices = buffer.reserve(max_length, slices, MaxSlices);
Api::IoCallUint64Result result = readv(max_length, slices, num_slices);
Buffer::Reservation reservation = buffer.reserve(max_length);
Api::IoCallUint64Result result = readv(max_length, reservation.slices(), reservation.numSlices());
uint64_t bytes_to_commit = result.ok() ? result.rc_ : 0;
ASSERT(bytes_to_commit <= max_length);
for (uint64_t i = 0; i < num_slices; i++) {
slices[i].len_ = std::min(slices[i].len_, static_cast<size_t>(bytes_to_commit));
bytes_to_commit -= slices[i].len_;
}
buffer.commit(slices, num_slices);
reservation.commit(bytes_to_commit);

// Emulated edge events need to registered if the socket operation did not complete
// because the socket would block.
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/raw_buffer_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
bool end_stream = false;
do {
// 16K read is arbitrary. TODO(mattklein123) PERF: Tune the read size.
Api::IoCallUint64Result result = callbacks_->ioHandle().read(buffer, 16384);
antoniovicente marked this conversation as resolved.
Show resolved Hide resolved
Api::IoCallUint64Result result = callbacks_->ioHandle().read(buffer, 131072);
ggreenway marked this conversation as resolved.
Show resolved Hide resolved

if (result.ok()) {
ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_);
Expand Down
34 changes: 15 additions & 19 deletions source/extensions/transport_sockets/tls/ssl_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,13 @@ SslSocket::ReadResult SslSocket::sslReadIntoSlice(Buffer::RawSlice& slice) {
ASSERT(static_cast<size_t>(rc) <= remaining);
mem += rc;
remaining -= rc;
result.commit_slice_ = true;
result.bytes_read_ += rc;
} else {
result.error_ = absl::make_optional<int>(rc);
break;
}
}

if (result.commit_slice_) {
slice.len_ -= remaining;
}
return result;
}

Expand All @@ -122,17 +119,16 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) {
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
while (keep_reading) {
uint64_t bytes_read_this_iteration = 0;
// We use 2 slices here so that we can use the remainder of an existing buffer chain element
// if there is extra space. 16K read is arbitrary and can be tuned later.
Buffer::RawSlice slices[2];
uint64_t slices_to_commit = 0;
uint64_t num_slices = read_buffer.reserve(16384, slices, 2);
for (uint64_t i = 0; i < num_slices; i++) {
auto result = sslReadIntoSlice(slices[i]);
if (result.commit_slice_) {
slices_to_commit++;
bytes_read += slices[i].len_;
}
Buffer::Reservation reservation = read_buffer.reserve(16384);
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
ggreenway marked this conversation as resolved.
Show resolved Hide resolved
// Buffer::RawSlice slices[2];
// uint64_t slices_to_commit = 0;
// uint64_t num_slices = read_buffer.reserve(16384, slices, 2);
for (uint64_t i = 0; i < reservation.numSlices(); i++) {
auto result = sslReadIntoSlice(reservation.slices()[i]);
bytes_read_this_iteration += result.bytes_read_;
if (result.error_.has_value()) {
keep_reading = false;
int err = SSL_get_error(rawSsl(), result.error_.value());
Expand Down Expand Up @@ -162,13 +158,13 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) {
}
}

if (slices_to_commit > 0) {
read_buffer.commit(slices, slices_to_commit);
if (callbacks_->shouldDrainReadBuffer()) {
callbacks_->setTransportSocketIsReadable();
keep_reading = false;
}
reservation.commit(bytes_read_this_iteration);
if (bytes_read_this_iteration > 0 && callbacks_->shouldDrainReadBuffer()) {
callbacks_->setReadBufferReady();
keep_reading = false;
}

bytes_read += bytes_read_this_iteration;
}

ENVOY_CONN_LOG(trace, "ssl read {} bytes", callbacks_->connection(), bytes_read);
Expand Down
Loading