-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: FakeUpstream threading fixes #14526
Changes from 14 commits
512610f
19a63f7
5f767cf
1bfa958
45b8864
3b653f2
ee052d6
e9f4597
3c55b38
495b19e
6580555
f90610f
22211c9
0d4fc3d
d0565e5
197362f
3aee6d7
8fccaf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
#include "test/test_common/utility.h" | ||
|
||
#include "absl/strings/str_cat.h" | ||
#include "absl/synchronization/notification.h" | ||
|
||
using namespace std::chrono_literals; | ||
|
||
|
@@ -354,14 +355,28 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { | |
} | ||
|
||
AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().readDisable(disable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
} | ||
} | ||
|
||
AssertionResult FakeConnectionBase::enableHalfClose(bool enable, | ||
std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().enableHalfClose(enable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
} | ||
} | ||
|
||
Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { | ||
|
@@ -513,6 +528,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, | |
const std::vector<Network::FilterFactoryCb>&) { | ||
absl::MutexLock lock(&lock_); | ||
if (read_disable_on_new_connection_) { | ||
// Disable early close detection to avoid closing the network connection before full | ||
// initialization is complete. | ||
connection.detectEarlyCloseWhenReadDisabled(false); | ||
connection.readDisable(true); | ||
} | ||
auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection); | ||
|
@@ -552,16 +570,16 @@ AssertionResult FakeUpstream::waitForHttpConnection( | |
client_dispatcher, timeout)) { | ||
return AssertionFailure() << "Timed out waiting for new connection."; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, | ||
max_request_headers_count, headers_with_underscores_action); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
if (read_disable_on_new_connection_) { | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
} | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
AssertionResult | ||
|
@@ -585,14 +603,17 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, | |
client_dispatcher, 5ms)) { | ||
continue; | ||
} | ||
} | ||
|
||
return upstream.runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&upstream.lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), | ||
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, | ||
envoy::config::core::v3::HttpProtocolOptions::ALLOW); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
} | ||
return AssertionFailure() << "Timed out waiting for HTTP connection."; | ||
|
@@ -610,19 +631,57 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect | |
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem()); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
testing::AssertionResult | ||
FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { | ||
ASSERT(!read_disable_on_new_connection_); | ||
SharedConnectionWrapper* connection; | ||
{ | ||
absl::MutexLock lock(&lock_); | ||
ENVOY_LOG_MISC(critical, "waitForAndConsumeDisconnectedConnection"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? delete or change log level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I thought I had removed this debug comment from the time I was trying to track down macos issues via CI. |
||
const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { | ||
return !new_connections_.empty(); | ||
}; | ||
|
||
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
return AssertionSuccess(); | ||
|
||
VERIFY_ASSERTION(runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = &consumeConnection(); | ||
return AssertionSuccess(); | ||
})); | ||
|
||
return connection->waitForDisconnect(time_system_, timeout); | ||
} | ||
|
||
SharedConnectionWrapper& FakeUpstream::consumeConnection() { | ||
ASSERT(!new_connections_.empty()); | ||
auto* const connection_wrapper = new_connections_.front().get(); | ||
// Skip the thread safety check if the network connection has already been freed since there's no | ||
// alternate way to get access to the dispatcher. | ||
ASSERT(!connection_wrapper->connected() || | ||
connection_wrapper->connection().dispatcher().isThreadSafe()); | ||
connection_wrapper->setParented(); | ||
connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); | ||
if (read_disable_on_new_connection_) { | ||
// Re-enable read and early close detection. | ||
auto& connection = connection_wrapper->connection(); | ||
connection.detectEarlyCloseWhenReadDisabled(true); | ||
connection.readDisable(false); | ||
} | ||
return *connection_wrapper; | ||
} | ||
|
||
|
@@ -647,6 +706,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { | |
received_datagrams_.emplace_back(std::move(data)); | ||
} | ||
|
||
AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this have a timeout to avoid test hard timeout? It could be hard coded to 15s if that is easier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a timeout which results in a RELEASE_ASSERT if hit. The callbacks run on the dispatcher should be really fast, so it taking an extended period of time indicates that something has gone really wrong. Cancelling the work is not an option since it is hard to tell how far the callback got before the timeout happens. Returning AssertionResult is not a good option because there are FakeUpstream functions that wait for an HTTP upstream connection with short timeout in a loop and interpret the returned AssertionResult as a retryable failure. |
||
AssertionResult result = AssertionSuccess(); | ||
absl::Notification done; | ||
ASSERT(!dispatcher_->isThreadSafe()); | ||
dispatcher_->post([&]() { | ||
result = cb(); | ||
done.Notify(); | ||
}); | ||
done.WaitForNotification(); | ||
return result; | ||
} | ||
|
||
void FakeUpstream::sendUdpDatagram(const std::string& buffer, | ||
const Network::Address::InstanceConstSharedPtr& peer) { | ||
dispatcher_->post([this, buffer, peer] { | ||
|
@@ -683,14 +754,23 @@ FakeRawConnection::~FakeRawConnection() { | |
} | ||
|
||
testing::AssertionResult FakeRawConnection::initialize() { | ||
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)}; | ||
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; | ||
read_filter_ = filter; | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
if (!shared_connection_.connected()) { | ||
VERIFY_ASSERTION(FakeConnectionBase::initialize()); | ||
return AssertionFailure() << "initialize failed, connection is disconnected."; | ||
} | ||
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().addReadFilter(filter); | ||
} else { | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
} | ||
} | ||
return FakeConnectionBase::initialize(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to special-case same-thread calls? Is that because when calling from the same thread, we are assuming the operation is blocking? (add code comments and we are good to go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeOnDispatcher blocks on until the posted callback finishes. Calling post and waiting from the dispatcher thread results in a deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense; adding a code comment may help the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea: Instead of dealing this in all call sites, could executeOnDispatcher() figure this out and just run the lambda versus posting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I undid this branching and reverted this function.
FakeUpstream now does readDisable and enableHalfClose directly on the network connection instead of going through this wrapper in the cases where the operation happens from the dispatcher thread. Some tests still call this method and that requires going down the executeOnDispatcher version.