Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4221
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
fuzhe1989 authored and ti-chi-bot committed Mar 11, 2022
1 parent 2b887f0 commit 2009e83
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 8 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ class MPMCQueue
{
}

~MPMCQueue()
{
std::unique_lock lock(mu);
for (; read_pos < write_pos; ++read_pos)
destruct(getObj(read_pos));
}

/// Block util:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
Expand Down
90 changes: 82 additions & 8 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include <thread>
#include <vector>

namespace DB
namespace DB::tests
{
namespace tests
namespace
{
class TestMPMCQueue : public ::testing::Test
class MPMCQueueTest : public ::testing::Test
{
protected:
std::random_device rd;
Expand Down Expand Up @@ -489,28 +489,28 @@ class TestMPMCQueue : public ::testing::Test
};

template <>
struct TestMPMCQueue::ValueHelper<int>
struct MPMCQueueTest::ValueHelper<int>
{
static int make(int v) { return v; }
static int extract(int v) { return v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::unique_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::unique_ptr<int>>
{
static std::unique_ptr<int> make(int v) { return std::make_unique<int>(v); }
static int extract(std::unique_ptr<int> & v) { return *v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::shared_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::shared_ptr<int>>
{
static std::shared_ptr<int> make(int v) { return std::make_shared<int>(v); }
static int extract(std::shared_ptr<int> & v) { return *v; }
};

#define ADD_TEST_FOR(type_name, type, test_name, ...) \
TEST_F(TestMPMCQueue, type_name##_##test_name) \
TEST_F(MPMCQueueTest, type_name##_##test_name) \
try \
{ \
test##test_name<type>(__VA_ARGS__); \
Expand All @@ -533,7 +533,7 @@ ADD_TEST(CancelEmpty, 4, 4);
ADD_TEST(CancelConcurrentPop, 4);
ADD_TEST(CancelConcurrentPush, 4);

TEST_F(TestMPMCQueue, ExceptionSafe)
TEST_F(MPMCQueueTest, ExceptionSafe)
try
{
MPMCQueue<ThrowInjectable> queue(10);
Expand Down Expand Up @@ -590,5 +590,79 @@ try
}
CATCH

<<<<<<< HEAD
} // namespace tests
} // namespace DB
=======
TEST_F(MPMCQueueTest, isNextOpNonBlocking)
try
{
MPMCQueue<int> q(2);
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_FALSE(q.isNextPopNonBlocking());
ASSERT_TRUE(q.push(1));
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
int val;
ASSERT_TRUE(q.pop(val));
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_FALSE(q.isNextPopNonBlocking());
ASSERT_TRUE(q.push(1));
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
ASSERT_TRUE(q.push(1));
ASSERT_FALSE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());

ASSERT_TRUE(q.finish());
ASSERT_FALSE(q.finish());

//check isNextPush/PopNonBlocking after finish
ASSERT_TRUE(q.pop(val));
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
ASSERT_TRUE(q.pop(val));
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
}
CATCH

struct Counter
{
static int count;
Counter()
{
++count;
}

~Counter()
{
--count;
}
};
int Counter::count = 0;

TEST_F(MPMCQueueTest, objectsDestructed)
try
{
{
MPMCQueue<Counter> queue(100);
queue.emplace();
ASSERT_EQ(Counter::count, 1);

{
Counter cnt;
queue.pop(cnt);
}
ASSERT_EQ(Counter::count, 0);

queue.emplace();
ASSERT_EQ(Counter::count, 1);
}
ASSERT_EQ(Counter::count, 0);
}
CATCH

} // namespace
} // namespace DB::tests
>>>>>>> f50e1a9114 (*: Fix memory leak when MPMCQueue destruct with non-poped elements (#4221))

0 comments on commit 2009e83

Please sign in to comment.