diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 1dc584b637e1..719d63a060d9 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -12,13 +12,6 @@ struct RawSlice { uint64_t len_; }; -/** - * Buffer change callback. - * @param old_size supplies the size of the buffer prior to the change. - * @param data supplies how much data was added or removed. - */ -typedef std::function Callback; - /** * A basic buffer abstraction. */ @@ -118,13 +111,6 @@ class Instance { */ virtual ssize_t search(const void* data, uint64_t size, size_t start) const PURE; - /** - * Set a buffer change callback. Only a single callback can be set at a time. The callback - * is invoked inline with buffer changes. - * @param callback supplies the callback to set. Pass nullptr to clear the callback. - */ - virtual void setCallback(Callback callback) PURE; - /** * Write the buffer out to a file descriptor. * @param fd supplies the descriptor to write to. diff --git a/include/envoy/network/connection.h b/include/envoy/network/connection.h index 3c3aa3dc736d..50bdc0b97f6a 100644 --- a/include/envoy/network/connection.h +++ b/include/envoy/network/connection.h @@ -34,14 +34,6 @@ class ConnectionCallbacks { public: virtual ~ConnectionCallbacks() {} - /** - * Callback for connection buffer changes. - * @param type supplies which buffer has changed. - * @param old_size supplies the original size of the buffer. - * @param delta supplies how much data was added or removed from the buffer. - */ - virtual void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta) PURE; - /** * Callback for connection events. * @param events supplies the ConnectionEvent events that occurred as a bitmask. @@ -64,6 +56,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager { public: enum class State { Open, Closing, Closed }; + struct BufferStats { + Stats::Counter& read_total_; + Stats::Gauge& read_current_; + Stats::Counter& write_total_; + Stats::Gauge& write_current_; + }; + virtual ~Connection() {} /** @@ -116,6 +115,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager { */ virtual const std::string& remoteAddress() PURE; + /** + * Set the buffer stats to update when the connection's read/write buffers change. Note that + * for performance reasons these stats are eventually consistent and may not always accurately + * represent the buffer contents at any given point in time. + */ + virtual void setBufferStats(const BufferStats& stats) PURE; + /** * @return the SSL connection data if this is an SSL connection, or nullptr if it is not. */ diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index b1bc57192077..d3a09d2e932a 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -6,10 +6,6 @@ namespace Buffer { -const evbuffer_cb_func OwnedImpl::buffer_cb_ = - [](evbuffer*, const evbuffer_cb_info* info, void* arg) - -> void { static_cast(arg)->onBufferChange(*info); }; - void OwnedImpl::add(const void* data, uint64_t size) { evbuffer_add(buffer_.get(), data, size); } void OwnedImpl::add(const std::string& data) { @@ -78,10 +74,6 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) { UNREFERENCED_PARAMETER(rc); } -void OwnedImpl::onBufferChange(const evbuffer_cb_info& info) { - cb_(info.orig_size, info.n_added - info.n_deleted); -} - int OwnedImpl::read(int fd, uint64_t max_length) { return evbuffer_read(buffer_.get(), fd, max_length); } @@ -108,17 +100,6 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start) const { return result_ptr.pos; } -void OwnedImpl::setCallback(Callback callback) { - ASSERT(!callback || !cb_); - if (callback) { - evbuffer_add_cb(buffer_.get(), buffer_cb_, this); - cb_ = callback; - } else { - evbuffer_remove_cb(buffer_.get(), buffer_cb_, this); - cb_ = nullptr; - } -} - int OwnedImpl::write(int fd) { return evbuffer_write(buffer_.get(), fd); } OwnedImpl::OwnedImpl() : buffer_(evbuffer_new()) {} diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 7bba7d3c54db..63c7d02ddee1 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -4,10 +4,6 @@ #include "common/event/libevent.h" -// Forward decls to avoid leaking libevent headers to rest of program. -struct evbuffer_cb_info; -typedef void (*evbuffer_cb_func)(evbuffer* buffer, const evbuffer_cb_info* info, void* arg); - namespace Buffer { /** @@ -34,17 +30,10 @@ class OwnedImpl : public Instance { int read(int fd, uint64_t max_length) override; uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; ssize_t search(const void* data, uint64_t size, size_t start) const override; - void setCallback(Callback callback) override; int write(int fd) override; private: - void onBufferChange(const evbuffer_cb_info& info); - - static const evbuffer_cb_func buffer_cb_; // Static callback used for all evbuffer callbacks. - // This allows us to add/remove by value. - Event::Libevent::BufferPtr buffer_; - Callback cb_; // The per buffer callback. Invoked via the buffer_cb_ static thunk. }; } // Buffer diff --git a/source/common/filter/auth/client_ssl.h b/source/common/filter/auth/client_ssl.h index faae983ab0a0..087479cbf119 100644 --- a/source/common/filter/auth/client_ssl.h +++ b/source/common/filter/auth/client_ssl.h @@ -110,7 +110,6 @@ class Instance : public Network::ReadFilter, public Network::ConnectionCallbacks } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; private: diff --git a/source/common/filter/ratelimit.h b/source/common/filter/ratelimit.h index 0f586d1848ac..f632c46072e3 100644 --- a/source/common/filter/ratelimit.h +++ b/source/common/filter/ratelimit.h @@ -74,7 +74,6 @@ class Instance : public Network::ReadFilter, } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; // RateLimit::RequestCallbacks diff --git a/source/common/filter/tcp_proxy.cc b/source/common/filter/tcp_proxy.cc index a5fdf280963d..a1beabb0d3b5 100644 --- a/source/common/filter/tcp_proxy.cc +++ b/source/common/filter/tcp_proxy.cc @@ -45,6 +45,16 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor POOL_GAUGE_PREFIX(store, final_prefix))}; } +void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; + conn_log_info("new tcp proxy session", read_callbacks_->connection()); + read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); + read_callbacks_->connection().setBufferStats({config_->stats().downstream_cx_rx_bytes_total_, + config_->stats().downstream_cx_rx_bytes_buffered_, + config_->stats().downstream_cx_tx_bytes_total_, + config_->stats().downstream_cx_tx_bytes_buffered_}); +} + Network::FilterStatus TcpProxy::initializeUpstreamConnection() { Upstream::ResourceManager& upstream_cluster_resource_manager = cluster_manager_.get(config_->clusterName()) @@ -68,6 +78,11 @@ Network::FilterStatus TcpProxy::initializeUpstreamConnection() { upstream_connection_->addReadFilter(upstream_callbacks_); upstream_connection_->addConnectionCallbacks(*upstream_callbacks_); + upstream_connection_->setBufferStats( + {read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_buffered_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_, + read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_}); upstream_connection_->connect(); upstream_connection_->noDelay(true); @@ -103,18 +118,6 @@ Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) { return Network::FilterStatus::StopIteration; } -void TcpProxy::onDownstreamBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - if (type == Network::ConnectionBufferType::Write) { - if (delta > 0) { - config_->stats().downstream_cx_tx_bytes_total_.add(delta); - config_->stats().downstream_cx_tx_bytes_buffered_.add(delta); - } else { - config_->stats().downstream_cx_tx_bytes_buffered_.sub(std::abs(delta)); - } - } -} - void TcpProxy::onDownstreamEvent(uint32_t event) { if ((event & Network::ConnectionEvent::RemoteClose || event & Network::ConnectionEvent::LocalClose) && @@ -127,18 +130,6 @@ void TcpProxy::onDownstreamEvent(uint32_t event) { } } -void TcpProxy::onUpstreamBufferChange(Network::ConnectionBufferType type, uint64_t, int64_t delta) { - if (type == Network::ConnectionBufferType::Write) { - if (delta > 0) { - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_.add(delta); - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.add(delta); - } else { - read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.sub( - std::abs(delta)); - } - } -} - void TcpProxy::onUpstreamData(Buffer::Instance& data) { read_callbacks_->connection().write(data); ASSERT(0 == data.length()); diff --git a/source/common/filter/tcp_proxy.h b/source/common/filter/tcp_proxy.h index a31413a6eed3..898686076983 100644 --- a/source/common/filter/tcp_proxy.h +++ b/source/common/filter/tcp_proxy.h @@ -17,6 +17,8 @@ namespace Filter { */ // clang-format off #define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \ + COUNTER(downstream_cx_rx_bytes_total) \ + GAUGE (downstream_cx_rx_bytes_buffered) \ COUNTER(downstream_cx_tx_bytes_total) \ GAUGE (downstream_cx_tx_bytes_buffered) // clang-format on @@ -61,22 +63,13 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggableconnection()); - read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); - } + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; private: struct DownstreamCallbacks : public Network::ConnectionCallbacks { DownstreamCallbacks(TcpProxy& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override { - parent_.onDownstreamBufferChange(type, old_size, delta); - } - void onEvent(uint32_t event) override { parent_.onDownstreamEvent(event); } TcpProxy& parent_; @@ -87,11 +80,6 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable, */ StreamEncoder& newStream(StreamDecoder& response_decoder); + void setBufferStats(const Network::Connection::BufferStats& stats) { + connection_->setBufferStats(stats); + } + void setCodecClientCallbacks(CodecClientCallbacks& callbacks) { codec_client_callbacks_ = &callbacks; } @@ -180,7 +184,6 @@ class CodecClient : Logger::Loggable, void onData(Buffer::Instance& data); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; std::list active_requests_; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 9e4a9e20dc0e..fbc8f8eb1067 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -57,6 +57,11 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal [this]() -> void { onIdleTimeout(); }); idle_timer_->enableTimer(config_.idleTimeout().value()); } + + read_callbacks_->connection().setBufferStats({stats_.named_.downstream_cx_rx_bytes_total_, + stats_.named_.downstream_cx_rx_bytes_buffered_, + stats_.named_.downstream_cx_tx_bytes_total_, + stats_.named_.downstream_cx_tx_bytes_buffered_}); } ConnectionManagerImpl::~ConnectionManagerImpl() { @@ -189,14 +194,6 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data) { return Network::FilterStatus::StopIteration; } -void ConnectionManagerImpl::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats(type, delta, stats_.named_.downstream_cx_rx_bytes_total_, - stats_.named_.downstream_cx_rx_bytes_buffered_, - stats_.named_.downstream_cx_tx_bytes_total_, - stats_.named_.downstream_cx_tx_bytes_buffered_); -} - void ConnectionManagerImpl::resetAllStreams() { while (!streams_.empty()) { // Mimic a downstream reset in this case. diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 7a4a6cbfa6af..e2e05268abb3 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -215,8 +215,6 @@ class ConnectionManagerImpl : Logger::Loggable, StreamDecoder& newStream(StreamEncoder& response_encoder) override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override; void onEvent(uint32_t events) override; private: diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index e65d6f4a0672..eb5e42425145 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -276,6 +276,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan(); connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout()); parent_.host_->cluster().resourceManager(parent_.priority_).connections().inc(); + + codec_client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_}); } ConnPoolImpl::ActiveClient::~ActiveClient() { @@ -285,15 +290,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() { parent_.host_->cluster().resourceManager(parent_.priority_).connections().dec(); } -void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats( - type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_); -} - void ConnPoolImpl::ActiveClient::onConnectTimeout() { // We just close the client at this point. This will result in both a timeout and a connect // failure and will fold into all the normal connect failure logic. diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index dc9fde1feace..ec036f868fa8 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -70,8 +70,6 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onConnectTimeout(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size, - int64_t delta) override; void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } ConnPoolImpl& parent_; diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 0b3ebdb95d03..d60b7d6a8a3a 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -226,6 +226,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_active_.inc(); parent_.host_->cluster().stats().upstream_cx_http2_total_.inc(); conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan(); + + client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, + parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_}); } ConnPoolImpl::ActiveClient::~ActiveClient() { @@ -234,15 +239,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() { conn_length_->complete(); } -void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t, - int64_t delta) { - Network::Utility::updateBufferStats( - type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_, - parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_); -} - CodecClientPtr ProdConnPoolImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) { CodecClientStats stats{host_->cluster().stats().upstream_cx_protocol_error_}; CodecClientPtr codec{new CodecClientProd(CodecClient::Type::HTTP2, std::move(data.connection_), diff --git a/source/common/http/http2/conn_pool.h b/source/common/http/http2/conn_pool.h index a59d30a38cf1..be450e194c75 100644 --- a/source/common/http/http2/conn_pool.h +++ b/source/common/http/http2/conn_pool.h @@ -37,7 +37,6 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onConnectTimeout() { parent_.onConnectTimeout(*this); } // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType type, uint64_t, int64_t delta) override; void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); } // CodecClientCallbacks diff --git a/source/common/mongo/proxy.h b/source/common/mongo/proxy.h index 6b68ba3922ba..3985b944d9e7 100644 --- a/source/common/mongo/proxy.h +++ b/source/common/mongo/proxy.h @@ -104,7 +104,6 @@ class ProxyFilter : public Network::Filter, // Network::ConnectionCallbacks void onEvent(uint32_t event) override; - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} private: struct ActiveQuery { diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 9155fac9aaf5..1e35f7fa98f0 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -12,6 +12,24 @@ namespace Network { +void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total, + uint64_t& previous_total, Stats::Counter& stat_total, + Stats::Gauge& stat_current) { + if (delta) { + stat_total.add(delta); + } + + if (new_total != previous_total) { + if (new_total > previous_total) { + stat_current.add(new_total - previous_total); + } else { + stat_current.sub(previous_total - new_total); + } + + previous_total = new_total; + } +} + std::atomic ConnectionImpl::next_global_id_; ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, @@ -25,13 +43,6 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, file_event_ = dispatcher_.createFileEvent(fd_, [this](uint32_t events) -> void { onFileEvent(events); }); - - read_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void { - onBufferChange(ConnectionBufferType::Read, old_size, delta); - }); - write_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void { - onBufferChange(ConnectionBufferType::Write, old_size, delta); - }); } ConnectionImpl::~ConnectionImpl() { @@ -93,20 +104,10 @@ void ConnectionImpl::closeSocket(uint32_t close_type) { conn_log_debug("closing socket: {}", *this, close_type); - // Drain input and output buffers so that callbacks get fired. This does not happen automatically - // as part of destruction. - uint64_t current_read_buffer_length = read_buffer_.length(); - read_buffer_.setCallback(nullptr); - if (current_read_buffer_length > 0) { - onBufferChange(ConnectionBufferType::Read, current_read_buffer_length, - -current_read_buffer_length); - } - uint64_t current_write_buffer_length = write_buffer_.length(); - write_buffer_.setCallback(nullptr); - if (current_write_buffer_length > 0) { - onBufferChange(ConnectionBufferType::Write, current_write_buffer_length, - -current_write_buffer_length); - } + // Drain input and output buffers. + updateReadBufferStats(0, 0); + updateWriteBufferStats(0, 0); + buffer_stats_.reset(); file_event_.reset(); ::close(fd_); @@ -148,18 +149,12 @@ void ConnectionImpl::noDelay(bool enable) { uint64_t ConnectionImpl::id() { return id_; } -void ConnectionImpl::onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta) { - for (ConnectionCallbacks* callbacks : callbacks_) { - callbacks->onBufferChange(type, old_size, delta); - } -} - -void ConnectionImpl::onRead() { +void ConnectionImpl::onRead(uint64_t read_buffer_size) { if (!(state_ & InternalState::ReadEnabled)) { return; } - if (read_buffer_.length() == 0) { + if (read_buffer_size == 0) { return; } @@ -243,7 +238,9 @@ void ConnectionImpl::onFileEvent(uint32_t events) { } } -ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { +ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() { + PostIoAction action; + uint64_t bytes_read = 0; do { // 16K read is arbitrary. IIRC, libevent will currently clamp this to 4K. libevent will also // use an ioctl() before every read to figure out how much data there is to read. @@ -254,38 +251,48 @@ ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { // Remote close. Might need to raise data before raising close. if (rc == 0) { - return PostIoAction::Close; - } - - // Remote error (might be no data). - if (rc == -1) { + action = PostIoAction::Close; + break; + } else if (rc == -1) { + // Remote error (might be no data). conn_log_trace("read error: {}", *this, errno); if (errno == EAGAIN) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; } else { - return PostIoAction::Close; + action = PostIoAction::Close; } + + break; + } else { + bytes_read += rc; } } while (true); + + return {action, bytes_read}; } void ConnectionImpl::onReadReady() { ASSERT(!(state_ & InternalState::Connecting)); - PostIoAction action = doReadFromSocket(); - onRead(); + IoResult result = doReadFromSocket(); + uint64_t new_buffer_size = read_buffer_.length(); + updateReadBufferStats(result.bytes_processed_, new_buffer_size); + onRead(new_buffer_size); // The read callback may have already closed the connection. - if (action == PostIoAction::Close) { + if (result.action_ == PostIoAction::Close) { conn_log_debug("remote close", *this); closeSocket(ConnectionEvent::RemoteClose); } } -ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { +ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { + PostIoAction action; + uint64_t bytes_written = 0; do { if (write_buffer_.length() == 0) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; + break; } int rc = write_buffer_.write(fd_); @@ -293,12 +300,18 @@ ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { if (rc == -1) { conn_log_trace("write error: {}", *this, errno); if (errno == EAGAIN) { - return PostIoAction::KeepOpen; + action = PostIoAction::KeepOpen; } else { - return PostIoAction::Close; + action = PostIoAction::Close; } + + break; + } else { + bytes_written += rc; } } while (true); + + return {action, bytes_written}; } void ConnectionImpl::onConnected() { raiseEvents(ConnectionEvent::Connected); } @@ -324,12 +337,16 @@ void ConnectionImpl::onWriteReady() { } } - if (doWriteToSocket() == PostIoAction::Close) { + IoResult result = doWriteToSocket(); + uint64_t new_buffer_size = write_buffer_.length(); + updateWriteBufferStats(result.bytes_processed_, new_buffer_size); + + if (result.action_ == PostIoAction::Close) { // It is possible (though unlikely) for the connection to have already been closed during the // write callback. This can happen if we manage to complete the SSL handshake in the write // callback, raise a connected event, and close the connection. closeSocket(ConnectionEvent::RemoteClose); - } else if ((state_ & InternalState::CloseWithFlush) && write_buffer_.length() == 0) { + } else if ((state_ & InternalState::CloseWithFlush) && new_buffer_size == 0) { conn_log_debug("write flush complete", *this); closeSocket(ConnectionEvent::LocalClose); } @@ -353,6 +370,31 @@ void ConnectionImpl::doConnect(const sockaddr* addr, socklen_t addrlen) { } } +void ConnectionImpl::setBufferStats(const BufferStats& stats) { + ASSERT(!buffer_stats_); + buffer_stats_.reset(new BufferStats(stats)); +} + +void ConnectionImpl::updateReadBufferStats(uint64_t num_read, uint64_t new_size) { + if (!buffer_stats_) { + return; + } + + ConnectionImplUtility::updateBufferStats(num_read, new_size, last_read_buffer_size_, + buffer_stats_->read_total_, + buffer_stats_->read_current_); +} + +void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_size) { + if (!buffer_stats_) { + return; + } + + ConnectionImplUtility::updateBufferStats(num_written, new_size, last_write_buffer_size_, + buffer_stats_->write_total_, + buffer_stats_->write_current_); +} + ClientConnectionImpl::ClientConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& url) : ConnectionImpl(dispatcher, fd, url) {} diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 7656cbb23c29..f0e3151728af 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -11,6 +11,25 @@ namespace Network { +/** + * Utility functions for the connection implementation. + */ +class ConnectionImplUtility { +public: + /** + * Update the buffer stats for a connection. + * @param delta supplies the data read/written. + * @param new_total supplies the final total buffer size. + * @param previous_total supplies the previous final total buffer size. previous_total will be + * updated to new_total when the call is complete. + * @param stat_total supplies the counter to increment with the delta. + * @param stat_current supplies the guage that should be updated with the delta of previous_total + * and new_total. + */ + static void updateBufferStats(uint64_t delta, uint64_t new_total, uint64_t& previous_total, + Stats::Counter& stat_total, Stats::Gauge& stat_current); +}; + /** * Implementation of Network::Connection. */ @@ -37,6 +56,7 @@ class ConnectionImpl : public virtual Connection, void readDisable(bool disable) override; bool readEnabled() override; const std::string& remoteAddress() override { return remote_address_; } + void setBufferStats(const BufferStats& stats) override; Ssl::Connection* ssl() override { return nullptr; } State state() override; void write(Buffer::Instance& data) override; @@ -48,6 +68,11 @@ class ConnectionImpl : public virtual Connection, protected: enum class PostIoAction { Close, KeepOpen }; + struct IoResult { + PostIoAction action_; + uint64_t bytes_processed_; + }; + virtual void closeSocket(uint32_t close_type); void doConnect(const sockaddr* addr, socklen_t addrlen); void raiseEvents(uint32_t events); @@ -67,14 +92,16 @@ class ConnectionImpl : public virtual Connection, }; // clang-format on - virtual PostIoAction doReadFromSocket(); - virtual PostIoAction doWriteToSocket(); + virtual IoResult doReadFromSocket(); + virtual IoResult doWriteToSocket(); void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta); virtual void onConnected(); void onFileEvent(uint32_t events); - void onRead(); + void onRead(uint64_t read_buffer_size); void onReadReady(); void onWriteReady(); + void updateReadBufferStats(uint64_t num_read, uint64_t new_size); + void updateWriteBufferStats(uint64_t num_written, uint64_t new_size); static std::atomic next_global_id_; @@ -85,6 +112,9 @@ class ConnectionImpl : public virtual Connection, std::list callbacks_; uint32_t state_{InternalState::ReadEnabled}; Buffer::Instance* current_write_buffer_{}; + uint64_t last_read_buffer_size_{}; + uint64_t last_write_buffer_size_{}; + std::unique_ptr buffer_stats_; }; /** diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 6fbc182e7e92..a5b7636f8496 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -70,27 +70,6 @@ bool IpWhiteList::contains(const std::string& remote_address) const { const std::string Utility::TCP_SCHEME = "tcp://"; const std::string Utility::UNIX_SCHEME = "unix://"; -void Utility::updateBufferStats(ConnectionBufferType type, int64_t delta, Stats::Counter& rx_total, - Stats::Gauge& rx_buffered, Stats::Counter& tx_total, - Stats::Gauge& tx_buffered) { - if (type == ConnectionBufferType::Read) { - if (delta > 0) { - rx_total.add(delta); - rx_buffered.add(delta); - } else { - rx_buffered.sub(std::abs(delta)); - } - } else { - ASSERT(type == ConnectionBufferType::Write); - if (delta > 0) { - tx_total.add(delta); - tx_buffered.add(delta); - } else { - tx_buffered.sub(std::abs(delta)); - } - } -} - AddrInfoPtr Utility::resolveTCP(const std::string& host, uint32_t port) { addrinfo addrinfo_hints; memset(&addrinfo_hints, 0, sizeof(addrinfo_hints)); diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 3b0f61c03371..4d243e719076 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -37,14 +37,6 @@ class Utility { static const std::string TCP_SCHEME; static const std::string UNIX_SCHEME; - /** - * Update buffering stats for a connection. Meant to be paired with - * ConnectionCallbacks::onBufferChange(). - */ - static void updateBufferStats(ConnectionBufferType type, int64_t delta, Stats::Counter& rx_total, - Stats::Gauge& rx_buffered, Stats::Counter& tx_total, - Stats::Gauge& tx_buffered); - /** * Resolve a TCP address. * @param host supplies the host name. diff --git a/source/common/ssl/connection_impl.cc b/source/common/ssl/connection_impl.cc index 799bc98e8381..901e49e4268a 100644 --- a/source/common/ssl/connection_impl.cc +++ b/source/common/ssl/connection_impl.cc @@ -32,16 +32,17 @@ ConnectionImpl::~ConnectionImpl() { filter_manager_.destroyFilters(); } -Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { +Network::ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() { if (!handshake_complete_) { PostIoAction action = doHandshake(); if (action == PostIoAction::Close || !handshake_complete_) { - return action; + return {action, 0}; } } bool keep_reading = true; PostIoAction action = PostIoAction::KeepOpen; + uint64_t bytes_read = 0; while (keep_reading) { // We use 2 slices here so that we can use the remainder of an existing buffer chain element // if there is extra space. 16K read is arbitrary and can be tuned later. @@ -54,6 +55,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { if (rc > 0) { slices[i].len_ = rc; slices_to_commit++; + bytes_read += rc; } else { keep_reading = false; int err = SSL_get_error(ssl_.get(), rc); @@ -77,7 +79,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doReadFromSocket() { } } - return action; + return {action, bytes_read}; } Network::ConnectionImpl::PostIoAction ConnectionImpl::doHandshake() { @@ -117,16 +119,16 @@ void ConnectionImpl::drainErrorQueue() { } } -Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { +Network::ConnectionImpl::IoResult ConnectionImpl::doWriteToSocket() { if (!handshake_complete_) { PostIoAction action = doHandshake(); if (action == PostIoAction::Close || !handshake_complete_) { - return action; + return {action, 0}; } } if (write_buffer_.length() == 0) { - return PostIoAction::KeepOpen; + return {PostIoAction::KeepOpen, 0}; } uint64_t num_slices = write_buffer_.getRawSlices(nullptr, 0); @@ -154,7 +156,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { // Renegotiation has started. We don't handle renegotiation so just fall through. default: drainErrorQueue(); - return PostIoAction::Close; + return {PostIoAction::Close, bytes_written}; } break; @@ -165,7 +167,7 @@ Network::ConnectionImpl::PostIoAction ConnectionImpl::doWriteToSocket() { write_buffer_.drain(bytes_written); } - return PostIoAction::KeepOpen; + return {PostIoAction::KeepOpen, bytes_written}; } void ConnectionImpl::onConnected() { ASSERT(!handshake_complete_); } diff --git a/source/common/ssl/connection_impl.h b/source/common/ssl/connection_impl.h index ac37ce2baf17..0f38d29275b9 100644 --- a/source/common/ssl/connection_impl.h +++ b/source/common/ssl/connection_impl.h @@ -27,8 +27,8 @@ class ConnectionImpl : public Network::ConnectionImpl, public Connection { // Network::ConnectionImpl void closeSocket(uint32_t close_type) override; - PostIoAction doReadFromSocket() override; - PostIoAction doWriteToSocket() override; + IoResult doReadFromSocket() override; + IoResult doWriteToSocket() override; void onConnected() override; ContextImpl& ctx_; diff --git a/source/common/stats/statsd.h b/source/common/stats/statsd.h index 650d1faca08f..899f86634d3f 100644 --- a/source/common/stats/statsd.h +++ b/source/common/stats/statsd.h @@ -92,7 +92,6 @@ class TcpStatsdSink : public Sink { void shutdown() override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; TcpStatsdSink& parent_; diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index a9e9984ed41a..83be6bc73891 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -136,7 +136,6 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { void onResetStream(Http::StreamResetReason reason) override; // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; HttpHealthCheckerImpl& parent_; @@ -241,7 +240,6 @@ class TcpHealthCheckerImpl : public HealthCheckerImplBase { TcpSessionCallbacks(TcpActiveHealthCheckSession& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override { parent_.onEvent(events); } // Network::ReadFilter diff --git a/source/server/connection_handler.h b/source/server/connection_handler.h index 20d0f62e94d8..2408af1679b9 100644 --- a/source/server/connection_handler.h +++ b/source/server/connection_handler.h @@ -120,8 +120,6 @@ class ConnectionHandler final : NonCopyable { ~ActiveConnection(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} - void onEvent(uint32_t event) override { // Any event leads to destruction of the connection. if (event == Network::ConnectionEvent::LocalClose || diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 1a10d5ad153e..12b746bc02e1 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -5,21 +5,58 @@ #include "common/stats/stats_impl.h" #include "test/mocks/network/mocks.h" +#include "test/mocks/stats/mocks.h" using testing::_; using testing::Sequence; +using testing::InSequence; using testing::Invoke; using testing::Return; +using testing::StrictMock; using testing::Test; namespace Network { +TEST(ConnectionImplUtility, updateBufferStats) { + StrictMock counter; + StrictMock gauge; + uint64_t previous_total = 0; + + InSequence s; + EXPECT_CALL(counter, add(5)); + EXPECT_CALL(gauge, add(5)); + ConnectionImplUtility::updateBufferStats(5, 5, previous_total, counter, gauge); + EXPECT_EQ(5UL, previous_total); + + EXPECT_CALL(counter, add(1)); + EXPECT_CALL(gauge, sub(1)); + ConnectionImplUtility::updateBufferStats(1, 4, previous_total, counter, gauge); + + EXPECT_CALL(gauge, sub(4)); + ConnectionImplUtility::updateBufferStats(0, 0, previous_total, counter, gauge); + + EXPECT_CALL(counter, add(3)); + EXPECT_CALL(gauge, add(3)); + ConnectionImplUtility::updateBufferStats(3, 3, previous_total, counter, gauge); +} + TEST(ConnectionImplDeathTest, BadFd) { Event::DispatcherImpl dispatcher; EXPECT_DEATH(ConnectionImpl(dispatcher, -1, "127.0.0.1"), ".*assert failure: fd_ != -1.*"); } -TEST(ConnectionImplTest, BufferCallbacks) { +struct MockBufferStats { + Connection::BufferStats toBufferStats() { + return {rx_total_, rx_current_, tx_total_, tx_current_}; + } + + StrictMock rx_total_; + StrictMock rx_current_; + StrictMock tx_total_; + StrictMock tx_current_; +}; + +TEST(ConnectionImplTest, BufferStats) { Stats::IsolatedStoreImpl stats_store; Event::DispatcherImpl dispatcher; Network::TcpListenSocket socket(10000); @@ -31,6 +68,8 @@ TEST(ConnectionImplTest, BufferCallbacks) { dispatcher.createClientConnection("tcp://127.0.0.1:10000"); MockConnectionCallbacks client_callbacks; client_connection->addConnectionCallbacks(client_callbacks); + MockBufferStats client_buffer_stats; + client_connection->setBufferStats(client_buffer_stats.toBufferStats()); client_connection->connect(); std::shared_ptr write_filter(new MockWriteFilter()); @@ -44,24 +83,26 @@ TEST(ConnectionImplTest, BufferCallbacks) { .WillOnce(Return(FilterStatus::StopIteration)); EXPECT_CALL(*write_filter, onWrite(_)).InSequence(s1).WillOnce(Return(FilterStatus::Continue)); EXPECT_CALL(*filter, onWrite(_)).InSequence(s1).WillOnce(Return(FilterStatus::Continue)); - EXPECT_CALL(client_callbacks, onBufferChange(ConnectionBufferType::Write, 0, 4)).InSequence(s1); EXPECT_CALL(client_callbacks, onEvent(ConnectionEvent::Connected)).InSequence(s1); - EXPECT_CALL(client_callbacks, onBufferChange(ConnectionBufferType::Write, 4, -4)).InSequence(s1); + EXPECT_CALL(client_buffer_stats.tx_total_, add(4)).InSequence(s1); Network::ConnectionPtr server_connection; Network::MockConnectionCallbacks server_callbacks; + MockBufferStats server_buffer_stats; std::shared_ptr read_filter(new MockReadFilter()); EXPECT_CALL(listener_callbacks, onNewConnection_(_)) .WillOnce(Invoke([&](Network::ConnectionPtr& conn) -> void { server_connection = std::move(conn); server_connection->addConnectionCallbacks(server_callbacks); + server_connection->setBufferStats(server_buffer_stats.toBufferStats()); server_connection->addReadFilter(read_filter); EXPECT_EQ("", server_connection->nextProtocol()); })); Sequence s2; - EXPECT_CALL(server_callbacks, onBufferChange(ConnectionBufferType::Read, 0, 4)).InSequence(s2); - EXPECT_CALL(server_callbacks, onBufferChange(ConnectionBufferType::Read, 4, -4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_total_, add(4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_current_, add(4)).InSequence(s2); + EXPECT_CALL(server_buffer_stats.rx_current_, sub(4)).InSequence(s2); EXPECT_CALL(server_callbacks, onEvent(ConnectionEvent::LocalClose)).InSequence(s2); EXPECT_CALL(*read_filter, onNewConnection()); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index b270c1101858..f82186906e9c 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -66,7 +66,6 @@ class FakeConnectionBase : public Network::ConnectionCallbacks { void waitForDisconnect(); // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; protected: diff --git a/test/integration/integration.h b/test/integration/integration.h index dbf7421cf2ee..57841c32e92d 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -68,7 +68,6 @@ class IntegrationCodecClient : public Http::CodecClientProd { ConnectionCallbacks(IntegrationCodecClient& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; IntegrationCodecClient& parent_; @@ -113,7 +112,6 @@ class IntegrationTcpClient { ConnectionCallbacks(IntegrationTcpClient& parent) : parent_(parent) {} // Network::ConnectionCallbacks - void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {} void onEvent(uint32_t events) override; // Network::ReadFilter diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 901063d351dd..6de69c5930a1 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -16,7 +16,6 @@ class MockConnectionCallbacks : public ConnectionCallbacks { ~MockConnectionCallbacks(); // Network::ConnectionCallbacks - MOCK_METHOD3(onBufferChange, void(ConnectionBufferType type, uint64_t old_size, int64_t delta)); MOCK_METHOD1(onEvent, void(uint32_t events)); }; @@ -54,6 +53,7 @@ class MockConnection : public Connection, public MockConnectionBase { MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD0(readEnabled, bool()); MOCK_METHOD0(remoteAddress, const std::string&()); + MOCK_METHOD1(setBufferStats, void(const BufferStats& stats)); MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); @@ -82,6 +82,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase MOCK_METHOD1(readDisable, void(bool disable)); MOCK_METHOD0(readEnabled, bool()); MOCK_METHOD0(remoteAddress, const std::string&()); + MOCK_METHOD1(setBufferStats, void(const BufferStats& stats)); MOCK_METHOD0(ssl, Ssl::Connection*()); MOCK_METHOD0(state, State()); MOCK_METHOD1(write, void(Buffer::Instance& data)); diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index ccabe57954af..bff49e913f03 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -7,6 +7,9 @@ namespace Stats { MockCounter::MockCounter() {} MockCounter::~MockCounter() {} +MockGauge::MockGauge() {} +MockGauge::~MockGauge() {} + MockTimespan::MockTimespan() {} MockTimespan::~MockTimespan() {} diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index d31c647ab97c..80261c307d5b 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -20,6 +20,21 @@ class MockCounter : public Counter { MOCK_METHOD0(value, uint64_t()); }; +class MockGauge : public Gauge { +public: + MockGauge(); + ~MockGauge(); + + MOCK_METHOD1(add, void(uint64_t amount)); + MOCK_METHOD0(dec, void()); + MOCK_METHOD0(inc, void()); + MOCK_METHOD0(name, std::string()); + MOCK_METHOD1(set, void(uint64_t value)); + MOCK_METHOD1(sub, void(uint64_t amount)); + MOCK_METHOD0(used, bool()); + MOCK_METHOD0(value, uint64_t()); +}; + class MockTimespan : public Timespan { public: MockTimespan();