Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close buffered channel should unblock the blocked senders and receivers #8109

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 106 additions & 7 deletions paddle/framework/channel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ TEST(Channel, SufficientBufferSizeDoesntBlock) {
const size_t buffer_size = 10;
auto ch = MakeChannel<size_t>(buffer_size);
for (size_t i = 0; i < buffer_size; ++i) {
ch->Send(&i); // should not block
EXPECT_EQ(ch->Send(&i), true); // should not block
}

size_t out;
for (size_t i = 0; i < buffer_size; ++i) {
ch->Receive(&out); // should not block
EXPECT_EQ(ch->Receive(&out), true); // should not block
EXPECT_EQ(out, i);
}
CloseChannel(ch);
Expand All @@ -67,7 +67,10 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
std::thread t([&]() {
// Try to write more than buffer size.
for (size_t i = 0; i < 2 * buffer_size; ++i) {
ch->Send(&i); // should block after 10 iterations
if (i < buffer_size)
EXPECT_EQ(ch->Send(&i), true); // should block after 10 iterations
else
EXPECT_EQ(ch->Send(&i), false);
sum += i;
}
});
Expand All @@ -84,13 +87,13 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
unsigned sum_send = 0;
std::thread t([&]() {
for (int i = 0; i < 5; i++) {
ch->Send(&i);
EXPECT_EQ(ch->Send(&i), true);
sum_send += i;
}
});
for (int i = 0; i < 5; i++) {
int recv;
ch->Receive(&recv);
EXPECT_EQ(ch->Receive(&recv), true);
EXPECT_EQ(recv, i);
}

Expand All @@ -100,6 +103,102 @@ TEST(Channel, SimpleUnbufferedChannelTest) {
delete ch;
}

// This tests that closing a buffered channel also unblocks
// any receivers waiting on the channel
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
auto ch = MakeChannel<int>(1);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];

// Launches threads that try to read and are blocked because of no writers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
t[i] = std::thread(
[&](bool *p) {
int data;
// All reads should return false
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait

// Verify that all threads are blocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], false);
}

// Explicitly close the channel
// This should unblock all receivers
CloseChannel(ch);

std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait

// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}

// This tests that closing a buffered channel also unblocks
// any senders waiting for channel to have write space
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
auto ch = MakeChannel<int>(1);
size_t num_threads = 5;
std::thread t[num_threads];
bool thread_ended[num_threads];
bool send_success[num_threads];

// Launches threads that try to write and are blocked because of no readers
for (size_t i = 0; i < num_threads; i++) {
thread_ended[i] = false;
send_success[i] = false;
t[i] = std::thread(
[&](bool *ended, bool *success) {
int data = 10;
*success = ch->Send(&data);
*ended = true;
},
&thread_ended[i], &send_success[i]);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait

// Verify that atleast 4 threads are blocked
int ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (thread_ended[i] == false) ct++;
}
// Atleast 4 threads must be blocked
EXPECT_GE(ct, 4);

// Explicitly close the thread
// This should unblock all senders
CloseChannel(ch);

std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait

// Verify that all threads got unblocked
for (size_t i = 0; i < num_threads; i++) {
EXPECT_EQ(thread_ended[i], true);
}

// Verify that only 1 send was successful
ct = 0;
for (size_t i = 0; i < num_threads; i++) {
if (send_success[i]) ct++;
}
// Only 1 send must be successful
EXPECT_EQ(ct, 1);

for (size_t i = 0; i < num_threads; i++) t[i].join();
delete ch;
}

// This tests that closing an unbuffered channel also unblocks
// unblocks any receivers waiting for senders
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
Expand All @@ -114,7 +213,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
t[i] = std::thread(
[&](bool *p) {
int data;
ch->Receive(&data);
EXPECT_EQ(ch->Receive(&data), false);
*p = true;
},
&thread_ended[i]);
Expand Down Expand Up @@ -155,7 +254,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
t[i] = std::thread(
[&](bool *p) {
int data = 10;
ch->Send(&data);
EXPECT_EQ(ch->Send(&data), false);
*p = true;
},
&thread_ended[i]);
Expand Down