-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: FakeUpstream threading fixes #14526
Changes from 2 commits
512610f
19a63f7
5f767cf
1bfa958
45b8864
3b653f2
ee052d6
e9f4597
3c55b38
495b19e
6580555
f90610f
22211c9
0d4fc3d
d0565e5
197362f
3aee6d7
8fccaf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
#include "test/test_common/utility.h" | ||
|
||
#include "absl/strings/str_cat.h" | ||
#include "absl/synchronization/notification.h" | ||
|
||
using namespace std::chrono_literals; | ||
|
||
|
@@ -354,14 +355,28 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { | |
} | ||
|
||
AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().readDisable(disable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
} | ||
} | ||
|
||
AssertionResult FakeConnectionBase::enableHalfClose(bool enable, | ||
std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().enableHalfClose(enable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
} | ||
} | ||
|
||
Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { | ||
|
@@ -552,16 +567,19 @@ AssertionResult FakeUpstream::waitForHttpConnection( | |
client_dispatcher, timeout)) { | ||
return AssertionFailure() << "Timed out waiting for new connection."; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, | ||
max_request_headers_count, headers_with_underscores_action); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
if (read_disable_on_new_connection_) { | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
} | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
if (read_disable_on_new_connection_) { | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
} | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
AssertionResult | ||
|
@@ -585,14 +603,18 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, | |
client_dispatcher, 5ms)) { | ||
continue; | ||
} | ||
} | ||
|
||
return upstream.runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&upstream.lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), | ||
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, | ||
envoy::config::core::v3::HttpProtocolOptions::ALLOW); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
} | ||
return AssertionFailure() << "Timed out waiting for HTTP connection."; | ||
|
@@ -610,11 +632,29 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect | |
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem()); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
testing::AssertionResult | ||
FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { | ||
absl::MutexLock lock(&lock_); | ||
const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { | ||
return !new_connections_.empty() && !new_connections_.front()->connected(); | ||
}; | ||
|
||
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
consumeConnection(); | ||
return AssertionSuccess(); | ||
} | ||
|
||
|
@@ -647,6 +687,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { | |
received_datagrams_.emplace_back(std::move(data)); | ||
} | ||
|
||
AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this have a timeout to avoid test hard timeout? It could be hard coded to 15s if that is easier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a timeout which results in a RELEASE_ASSERT if hit. The callbacks run on the dispatcher should be really fast, so it taking an extended period of time indicates that something has gone really wrong. Cancelling the work is not an option since it is hard to tell how far the callback got before the timeout happens. Returning AssertionResult is not a good option because there are FakeUpstream functions that wait for an HTTP upstream connection with short timeout in a loop and interpret the returned AssertionResult as a retryable failure. |
||
AssertionResult result = AssertionSuccess(); | ||
absl::Notification done; | ||
ASSERT(!dispatcher_->isThreadSafe()); | ||
dispatcher_->post([&]() { | ||
result = cb(); | ||
done.Notify(); | ||
}); | ||
done.WaitForNotification(); | ||
return result; | ||
} | ||
|
||
void FakeUpstream::sendUdpDatagram(const std::string& buffer, | ||
const Network::Address::InstanceConstSharedPtr& peer) { | ||
dispatcher_->post([this, buffer, peer] { | ||
|
@@ -683,14 +735,19 @@ FakeRawConnection::~FakeRawConnection() { | |
} | ||
|
||
testing::AssertionResult FakeRawConnection::initialize() { | ||
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)}; | ||
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); | ||
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; | ||
read_filter_ = filter; | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().addReadFilter(filter); | ||
} else { | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
} | ||
} | ||
return FakeConnectionBase::initialize(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to special-case same-thread calls? Is that because when calling from the same thread, we are assuming the operation is blocking? (add code comments and we are good to go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeOnDispatcher blocks on until the posted callback finishes. Calling post and waiting from the dispatcher thread results in a deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense; adding a code comment may help the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea: Instead of dealing this in all call sites, could executeOnDispatcher() figure this out and just run the lambda versus posting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I undid this branching and reverted this function.
FakeUpstream now does readDisable and enableHalfClose directly on the network connection instead of going through this wrapper in the cases where the operation happens from the dispatcher thread. Some tests still call this method and that requires going down the executeOnDispatcher version.