Skip to content

Commit

Permalink
network: implement early close detection for OS X
Browse files Browse the repository at this point in the history
Implements early close detection on OS X by inspecting the TCP
state using getsockopt.

*Risk Level*: low (conditional compilation)
*Testing*: integration tests pass (tested against envoyproxy#4232)
*Docs Changes*: n/a
*Release Notes*: n/a
*Fixes*: envoyproxy#4294

Signed-off-by: Stephan Zuercher <stephan@turbinelabs.io>
  • Loading branch information
zuercher committed Aug 30, 2018
1 parent 0057e22 commit f4fb622
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 14 deletions.
5 changes: 5 additions & 0 deletions include/envoy/api/os_sys_calls.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ class OsSysCalls {
virtual SysCallIntResult getsockopt(int sockfd, int level, int optname, void* optval,
socklen_t* optlen) PURE;

/**
* @see man 2 getsockname
*/
virtual SysCallIntResult getsockname(int sockfd, sockaddr* addr, socklen_t* addrlen) PURE;

/**
* @see man 2 socket
*/
Expand Down
5 changes: 5 additions & 0 deletions source/common/api/os_sys_calls_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ SysCallIntResult OsSysCallsImpl::getsockopt(int sockfd, int level, int optname,
return {rc, errno};
}

SysCallIntResult OsSysCallsImpl::getsockname(int sockfd, sockaddr* addr, socklen_t* addrlen) {
const int rc = ::getsockname(sockfd, addr, addrlen);
return {rc, errno};
}

SysCallIntResult OsSysCallsImpl::socket(int domain, int type, int protocol) {
const int rc = ::socket(domain, type, protocol);
return {rc, errno};
Expand Down
1 change: 1 addition & 0 deletions source/common/api/os_sys_calls_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class OsSysCallsImpl : public OsSysCalls {
socklen_t optlen) override;
SysCallIntResult getsockopt(int sockfd, int level, int optname, void* optval,
socklen_t* optlen) override;
SysCallIntResult getsockname(int sockfd, sockaddr* addr, socklen_t* addrlen) override;
SysCallIntResult socket(int domain, int type, int protocol) override;
};

Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ envoy_cc_library(
"//include/envoy/event:timer_interface",
"//include/envoy/network:connection_interface",
"//include/envoy/network:filter_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:assert_lib",
Expand Down
101 changes: 90 additions & 11 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "common/network/connection_impl.h"

#include <netinet/tcp.h>

#ifdef __APPLE__
#include <netinet/tcp_fsm.h>
#endif
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
Expand All @@ -12,6 +16,7 @@
#include "envoy/event/timer.h"
#include "envoy/network/filter.h"

#include "common/api/os_sys_calls_impl.h"
#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
Expand Down Expand Up @@ -65,6 +70,13 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
Event::FileReadyType::Read | Event::FileReadyType::Write);

transport_socket_->setTransportSocketCallbacks(*this);

sockaddr addr;
socklen_t len = sizeof(addr);
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const Api::SysCallIntResult result = os_syscalls.getsockname(fd(), &addr, &len);
RELEASE_ASSERT(result.rc_ == 0, "");
is_uds_ = addr.sa_family == AF_UNIX;
}

ConnectionImpl::~ConnectionImpl() {
Expand Down Expand Up @@ -159,27 +171,24 @@ void ConnectionImpl::noDelay(bool enable) {
}

// Don't set NODELAY for unix domain sockets
sockaddr addr;
socklen_t len = sizeof(addr);
int rc = getsockname(fd(), &addr, &len);
RELEASE_ASSERT(rc == 0, "");

if (addr.sa_family == AF_UNIX) {
if (is_uds_) {
return;
}

// Set NODELAY
int new_value = enable;
rc = setsockopt(fd(), IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const Api::SysCallIntResult result =
os_syscalls.setsockopt(fd(), IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
#ifdef __APPLE__
if (-1 == rc && errno == EINVAL) {
if (-1 == result.rc_ && result.errno_ == EINVAL) {
// Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
// enabled despite this result.
return;
}
#endif

RELEASE_ASSERT(0 == rc, "");
RELEASE_ASSERT(0 == result.rc_, "");
}

uint64_t ConnectionImpl::id() const { return id_; }
Expand Down Expand Up @@ -249,7 +258,14 @@ void ConnectionImpl::readDisable(bool disable) {
// If half-close semantics are enabled, we never want early close notifications; we
// always want to read all avaiable data, even if the other side has closed.
if (detect_early_close_ && !enable_half_close_) {
#ifdef __APPLE__
// libevent only supports detecting early close with epoll, so we leave read events enabled
// and check the connection state on read, tracking real read events in
// disabled_read_pending_.
file_event_->setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Read);
#else
file_event_->setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Closed);
#endif // __APPLE__
} else {
file_event_->setEnabled(Event::FileReadyType::Write);
}
Expand All @@ -263,6 +279,17 @@ void ConnectionImpl::readDisable(bool disable) {
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);

#ifdef __APPLE__
if (disabled_read_pending_) {
// An actual read event occurred while reads were disabled (see above).
ENVOY_CONN_LOG(trace, "readDisable trigger pending read", *this);
disabled_read_pending_ = false;
file_event_->activate(Event::FileReadyType::Read);
return;
}
#endif

// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless.
Expand Down Expand Up @@ -403,6 +430,28 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
return;
}

#ifdef __APPLE__
// Because OSX doesn't support early close detection via events, we leave the read event enabled
// even when reads are disabled. Detect it here and convert it to a Closed event if we can detect
// the connection is closed. See https://github.com/envoyproxy/envoy/issues/4294.
if (detect_early_close_ && !read_enabled_ && (events & Event::FileReadyType::Read) != 0) {
if (detectEarlyClose()) {
// No longer connected. Convert to a closed event.
ENVOY_CONN_LOG(trace, "early close detection triggered", *this);
events |= Event::FileReadyType::Closed;
} else {
ENVOY_CONN_LOG(trace, "pending read in early close detection", *this);
disabled_read_pending_ = true;
}

// Reads are disabled, so never pass the read event along.
events &= ~Event::FileReadyType::Read;
if (!events) {
return;
}
}
#endif

if (events & Event::FileReadyType::Closed) {
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
Expand Down Expand Up @@ -459,8 +508,10 @@ void ConnectionImpl::onWriteReady() {
if (connecting_) {
int error;
socklen_t error_size = sizeof(error);
int rc = getsockopt(fd(), SOL_SOCKET, SO_ERROR, &error, &error_size);
ASSERT(0 == rc);
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const Api::SysCallIntResult result =
os_syscalls.getsockopt(fd(), SOL_SOCKET, SO_ERROR, &error, &error_size);
ASSERT(0 == result.rc_);

if (error == 0) {
ENVOY_CONN_LOG(debug, "connected", *this);
Expand Down Expand Up @@ -535,6 +586,34 @@ bool ConnectionImpl::bothSidesHalfClosed() {
return read_end_stream_ && write_end_stream_ && write_buffer_->length() == 0;
}

#ifdef __APPLE__
bool ConnectionImpl::detectEarlyClose() {
auto& os_syscalls = Api::OsSysCallsSingleton::get();

if (is_uds_) {
ENVOY_CONN_LOG(trace, "checking for UDS early close with read disabled", *this);

int bytes;
socklen_t bytes_size = sizeof(int);
const Api::SysCallIntResult result =
os_syscalls.getsockopt(fd(), SOL_SOCKET, SO_NREAD, &bytes, &bytes_size);
ASSERT(0 == result.rc_);

return bytes == 0;
}

ENVOY_CONN_LOG(trace, "checking for TCP early close with read disabled", *this);
tcp_connection_info info;
socklen_t info_size = sizeof(tcp_connection_info);
const Api::SysCallIntResult result =
os_syscalls.getsockopt(fd(), IPPROTO_TCP, TCP_CONNECTION_INFO, &info, &info_size);
ASSERT(0 == result.rc_);

return info.tcpi_state == TCPS_CLOSED ||
(info.tcpi_state >= TCPS_CLOSE_WAIT && info.tcpi_state <= TCPS_TIME_WAIT);
}
#endif

ClientConnectionImpl::ClientConnectionImpl(
Event::Dispatcher& dispatcher, const Address::InstanceConstSharedPtr& remote_address,
const Network::Address::InstanceConstSharedPtr& source_address,
Expand Down
11 changes: 10 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <netinet/tcp.h>

#include <atomic>
#include <cstdint>
#include <list>
Expand Down Expand Up @@ -129,7 +131,6 @@ class ConnectionImpl : public virtual Connection,
Buffer::InstancePtr write_buffer_;
uint32_t read_buffer_limit_ = 0;

protected:
bool connecting_{false};
ConnectionEvent immediate_error_event_{ConnectionEvent::Connected};
bool bind_error_{false};
Expand All @@ -146,6 +147,10 @@ class ConnectionImpl : public virtual Connection,
// Returns true iff end of stream has been both written and read.
bool bothSidesHalfClosed();

#ifdef __APPLE__
bool detectEarlyClose();
#endif

static std::atomic<uint64_t> next_global_id_;

Event::Dispatcher& dispatcher_;
Expand All @@ -161,6 +166,10 @@ class ConnectionImpl : public virtual Connection,
bool read_end_stream_{false};
bool write_end_stream_{false};
bool current_write_end_stream_{false};
#ifdef __APPLE__
bool disabled_read_pending_{false};
#endif
bool is_uds_{false};
Buffer::Instance* current_write_buffer_{};
uint64_t last_read_buffer_size_{};
uint64_t last_write_buffer_size_{};
Expand Down
1 change: 1 addition & 0 deletions test/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ envoy_cc_test(
"//test/test_common:environment_lib",
"//test/test_common:network_utility_lib",
"//test/test_common:test_time_lib",
"//test/test_common:threadsafe_singleton_injector_lib",
],
)

Expand Down
11 changes: 11 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "test/test_common/network_utility.h"
#include "test/test_common/printers.h"
#include "test/test_common/test_time.h"
#include "test/test_common/threadsafe_singleton_injector.h"
#include "test/test_common/utility.h"

#include "gmock/gmock.h"
Expand Down Expand Up @@ -887,6 +888,13 @@ class MockTransportConnectionImplTest : public testing::Test {
.WillOnce(Invoke([this](TransportSocketCallbacks& callbacks) {
transport_socket_callbacks_ = &callbacks;
}));
EXPECT_CALL(os_sys_calls_, getsockname(_, _, _))
.WillOnce(Invoke([](int, sockaddr* addr, socklen_t* addrlen) -> Api::SysCallIntResult {
EXPECT_NE(nullptr, addr);
EXPECT_NE(nullptr, addrlen);
addr->sa_family = AF_INET;
return {0, 0};
}));
connection_.reset(
new ConnectionImpl(dispatcher_, std::make_unique<ConnectionSocketImpl>(0, nullptr, nullptr),
TransportSocketPtr(transport_socket_), true));
Expand All @@ -903,6 +911,9 @@ class MockTransportConnectionImplTest : public testing::Test {
return {PostIoAction::KeepOpen, size, false};
}

NiceMock<Api::MockOsSysCalls> os_sys_calls_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls_{&os_sys_calls_};

std::unique_ptr<ConnectionImpl> connection_;
Event::MockDispatcher dispatcher_;
NiceMock<MockConnectionCallbacks> callbacks_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ TEST_P(ProxyProtocolTest, errorRecv_2) {
'r', 'e', ' ', 'd', 'a', 't', 'a'};
Api::MockOsSysCalls os_sys_calls;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls(&os_sys_calls);

// Pass through sys calls made by ConnectionImpl.
EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke(::getsockopt));

EXPECT_CALL(os_sys_calls, recv(_, _, _, _))
.Times(AnyNumber())
.WillOnce(Return(Api::SysCallSizeResult{-1, 0}));
Expand Down Expand Up @@ -297,6 +303,12 @@ TEST_P(ProxyProtocolTest, errorFIONREAD_1) {
'r', 'e', ' ', 'd', 'a', 't', 'a'};
Api::MockOsSysCalls os_sys_calls;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls(&os_sys_calls);

// Pass through sys calls made by ConnectionImpl.
EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke(::getsockopt));

EXPECT_CALL(os_sys_calls, ioctl(_, FIONREAD, _)).WillOnce(Return(Api::SysCallIntResult{-1, 0}));
EXPECT_CALL(os_sys_calls, writev(_, _, _))
.Times(AnyNumber())
Expand Down Expand Up @@ -490,6 +502,11 @@ TEST_P(ProxyProtocolTest, v2ParseExtensionsIoctlError) {
Api::MockOsSysCalls os_sys_calls;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls(&os_sys_calls);

// Pass through sys calls made by ConnectionImpl.
EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke(::getsockopt));

EXPECT_CALL(os_sys_calls, ioctl(_, FIONREAD, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke([](int fd, unsigned long int request, void* argp) {
Expand Down Expand Up @@ -621,6 +638,11 @@ TEST_P(ProxyProtocolTest, v2Fragmented3Error) {
Api::MockOsSysCalls os_sys_calls;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls(&os_sys_calls);

// Pass through sys calls made by ConnectionImpl.
EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke(::getsockopt));

EXPECT_CALL(os_sys_calls, recv(_, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke([](int fd, void* buf, size_t len, int flags) {
Expand Down Expand Up @@ -667,6 +689,11 @@ TEST_P(ProxyProtocolTest, v2Fragmented4Error) {
Api::MockOsSysCalls os_sys_calls;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls(&os_sys_calls);

// Pass through sys calls made by ConnectionImpl.
EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke(::getsockopt));

EXPECT_CALL(os_sys_calls, recv(_, _, _, _))
.Times(AnyNumber())
.WillRepeatedly(Invoke([](int fd, void* buf, size_t len, int flags) {
Expand Down
4 changes: 2 additions & 2 deletions test/mocks/api/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ SysCallIntResult MockOsSysCalls::setsockopt(int sockfd, int level, int optname,

// Allow mocking system call failure.
if (setsockopt_(sockfd, level, optname, optval, optlen) != 0) {
return SysCallIntResult{-1, 0};
return SysCallIntResult{-1, errno};
}

boolsockopts_[SockOptKey(sockfd, level, optname)] = !!*reinterpret_cast<const int*>(optval);
Expand All @@ -63,7 +63,7 @@ SysCallIntResult MockOsSysCalls::getsockopt(int sockfd, int level, int optname,
}
// Allow mocking system call failure.
if (getsockopt_(sockfd, level, optname, optval, optlen) != 0) {
return {-1, 0};
return {-1, errno};
}
*reinterpret_cast<int*>(optval) = val;
return {0, 0};
Expand Down
1 change: 1 addition & 0 deletions test/mocks/api/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MockOsSysCalls : public OsSysCallsImpl {
int(int sockfd, int level, int optname, const void* optval, socklen_t optlen));
MOCK_METHOD5(getsockopt_,
int(int sockfd, int level, int optname, void* optval, socklen_t* optlen));
MOCK_METHOD3(getsockname, SysCallIntResult(int sockfd, sockaddr* addr, socklen_t* addrlen));
MOCK_METHOD3(socket, SysCallIntResult(int domain, int type, int protocol));

size_t num_writes_;
Expand Down

0 comments on commit f4fb622

Please sign in to comment.