Skip to content

Commit

Permalink
Support bthread_once and bthread singleton (#2520)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright authored Feb 26, 2024
1 parent bcb4286 commit eac1901
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,15 @@ extern void* bthread_getspecific(bthread_key_t key);
// Return current bthread tag
extern bthread_tag_t bthread_self_tag(void);

// The first call to bthread_once() by any thread in a process, with a given
// once_control, will call the init_routine() with no arguments. Subsequent
// calls of bthread_once() with the same once_control will not call the
// init_routine(). On return from bthread_once(), it is guaranteed that
// init_routine() has completed. The once_control parameter is used to
// determine whether the associated initialisation routine has been called.
// Returns 0 on success, error code otherwise.
extern int bthread_once(bthread_once_t* once_control, void (*init_routine)());

__END_DECLS

#endif // BTHREAD_BTHREAD_H
81 changes: 81 additions & 0 deletions src/bthread/bthread_once.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "bthread/types.h"
#include "bthread/butex.h"

bthread_once_t::bthread_once_t()
: _butex(bthread::butex_create_checked<butil::atomic<int>>()) {
_butex->store(UNINITIALIZED, butil::memory_order_relaxed);
}

bthread_once_t::~bthread_once_t() {
bthread::butex_destroy(_butex);
}

namespace bthread {

int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()) {
butil::atomic<int>* butex = once_control->_butex;
// We need acquire memory order for this load because if the value
// signals that initialization has finished, we need to see any
// data modifications done during initialization.
int val = butex->load(butil::memory_order_acquire);
if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
// The initialization has already been done.
return 0;
}
val = bthread_once_t::UNINITIALIZED;
if (butex->compare_exchange_strong(val, bthread_once_t::INPROGRESS,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
// This (b)thread is the first and the Only one here. Do the initialization.
init_routine();
// Mark *once_control as having finished the initialization. We need
// release memory order here because we need to synchronize with other
// (b)threads that want to use the initialized data.
butex->store(bthread_once_t::INITIALIZED, butil::memory_order_release);
// Wake up all other (b)threads.
bthread::butex_wake_all(butex);
return 0;
}

while (true) {
// Same as above, we need acquire memory order.
val = butex->load(butil::memory_order_acquire);
if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
// The initialization has already been done.
return 0;
}
// Unless your constructor can be very time consuming, it is very unlikely o hit
// this race. When it does, we just wait the thread until the object has been created.
if (bthread::butex_wait(butex, val, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
return errno;
}
}
}

} // namespace bthread

__BEGIN_DECLS

int bthread_once(bthread_once_t* once_control, void (*init_routine)()) {
return bthread::bthread_once_impl(once_control, init_routine);
}

__END_DECLS
61 changes: 61 additions & 0 deletions src/bthread/singleton_on_bthread_once.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BRPC_SINGLETON_ON_BTHREAD_ONCE_H
#define BRPC_SINGLETON_ON_BTHREAD_ONCE_H

#include "bthread/bthread.h"

namespace bthread {

template <typename T>
class GetLeakySingleton {
public:
static T* _instance;
static bthread_once_t* g_create_leaky_singleton_once;
static void create_leaky_singleton();
};

template <typename T>
T* GetLeakySingleton<T>::_instance = NULL;

template <typename T>
bthread_once_t* GetLeakySingleton<T>::g_create_leaky_singleton_once
= new bthread_once_t;

template <typename T>
void GetLeakySingleton<T>::create_leaky_singleton() {
_instance = new T;
}

// To get a never-deleted singleton of a type T, just call
// bthread::get_leaky_singleton<T>(). Most daemon (b)threads
// or objects that need to be always-on can be created by
// this function. This function can be called safely not only
// before main() w/o initialization issues of global variables,
// but also on bthread with hanging operation.
template <typename T>
inline T* get_leaky_singleton() {
using LeakySingleton = GetLeakySingleton<T>;
bthread_once(LeakySingleton::g_create_leaky_singleton_once,
LeakySingleton::create_leaky_singleton);
return LeakySingleton::_instance;
}

} // namespace bthread

#endif // BRPC_SINGLETON_ON_BTHREAD_ONCE_H
23 changes: 23 additions & 0 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,29 @@ typedef struct {
typedef struct {
} bthread_barrierattr_t;

#if defined(__cplusplus)
class bthread_once_t;
namespace bthread {
extern int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)());
}

class bthread_once_t {
public:
friend int bthread::bthread_once_impl(bthread_once_t* once_control, void (*init_routine)());
enum State {
UNINITIALIZED = 0,
INPROGRESS,
INITIALIZED,
};

bthread_once_t();
~bthread_once_t();

private:
butil::atomic<int>* _butex;
};
#endif

typedef struct {
uint64_t value;
} bthread_id_t;
Expand Down
136 changes: 136 additions & 0 deletions test/bthread_once_unittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>
#include "bthread/bthread.h"
#include "bthread/singleton_on_bthread_once.h"
#include "bthread/task_control.h"

namespace bthread {
extern TaskControl* g_task_control;
}

namespace {

bthread_once_t g_bthread_once_control;
bool g_bthread_once_started = false;
butil::atomic<int> g_bthread_once_count(0);

void init_routine() {
bthread_usleep(2000 * 1000);
g_bthread_once_count.fetch_add(1, butil::memory_order_relaxed);
}

void bthread_once_task() {
bthread_once(&g_bthread_once_control, init_routine);
// `init_routine' only be called once.
ASSERT_EQ(1, g_bthread_once_count.load(butil::memory_order_relaxed));
}

void* first_bthread_once_task(void*) {
g_bthread_once_started = true;
bthread_once_task();
return NULL;
}


void* other_bthread_once_task(void*) {
bthread_once_task();
return NULL;
}

TEST(BthreadOnceTest, once) {
bthread_t bid;
ASSERT_EQ(0, bthread_start_background(
&bid, NULL, first_bthread_once_task, NULL));
while (!g_bthread_once_started) {
bthread_usleep(1000);
}
ASSERT_NE(nullptr, bthread::g_task_control);
int concurrency = bthread::g_task_control->concurrency();
LOG(INFO) << "concurrency: " << concurrency;
ASSERT_GT(concurrency, 0);
std::vector<bthread_t> bids(concurrency * 100);
for (auto& id : bids) {
ASSERT_EQ(0, bthread_start_background(
&id, NULL, other_bthread_once_task, NULL));
}
bthread_once_task();

for (auto& id : bids) {
bthread_join(id, NULL);
}
bthread_join(bid, NULL);
}

bool g_bthread_started = false;
butil::atomic<int> g_bthread_singleton_count(0);

class BthreadSingleton {
public:
BthreadSingleton() {
bthread_usleep(2000 * 1000);
g_bthread_singleton_count.fetch_add(1, butil::memory_order_relaxed);
}
};

void get_bthread_singleton() {
auto instance = bthread::get_leaky_singleton<BthreadSingleton>();
ASSERT_NE(nullptr, instance);
// Only one BthreadSingleton instance has been created.
ASSERT_EQ(1, g_bthread_singleton_count.load(butil::memory_order_relaxed));
}

void* first_get_bthread_singleton(void*) {
g_bthread_started = true;
get_bthread_singleton();
return NULL;
}


void* get_bthread_singleton(void*) {
get_bthread_singleton();
return NULL;
}

// Singleton will definitely not cause deadlock,
// even if constructor of T will hang the bthread.
TEST(BthreadOnceTest, singleton) {
bthread_t bid;
ASSERT_EQ(0, bthread_start_background(
&bid, NULL, first_get_bthread_singleton, NULL));
while (!g_bthread_started) {
bthread_usleep(1000);
}
ASSERT_NE(nullptr, bthread::g_task_control);
int concurrency = bthread::g_task_control->concurrency();
LOG(INFO) << "concurrency: " << concurrency;
ASSERT_GT(concurrency, 0);
std::vector<bthread_t> bids(concurrency * 100);
for (auto& id : bids) {
ASSERT_EQ(0, bthread_start_background(
&id, NULL, get_bthread_singleton, NULL));
}
get_bthread_singleton();

for (auto& id : bids) {
bthread_join(id, NULL);
}
bthread_join(bid, NULL);
}

}

0 comments on commit eac1901

Please sign in to comment.