diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h index f91bc9afaf..a4c05867b9 100644 --- a/src/bthread/bthread.h +++ b/src/bthread/bthread.h @@ -336,6 +336,15 @@ extern void* bthread_getspecific(bthread_key_t key); // Return current bthread tag extern bthread_tag_t bthread_self_tag(void); +// The first call to bthread_once() by any thread in a process, with a given +// once_control, will call the init_routine() with no arguments. Subsequent +// calls of bthread_once() with the same once_control will not call the +// init_routine(). On return from bthread_once(), it is guaranteed that +// init_routine() has completed. The once_control parameter is used to +// determine whether the associated initialisation routine has been called. +// Returns 0 on success, error code otherwise. +extern int bthread_once(bthread_once_t* once_control, void (*init_routine)()); + __END_DECLS #endif // BTHREAD_BTHREAD_H diff --git a/src/bthread/bthread_once.cpp b/src/bthread/bthread_once.cpp new file mode 100644 index 0000000000..a5751bc7ee --- /dev/null +++ b/src/bthread/bthread_once.cpp @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "bthread/types.h" +#include "bthread/butex.h" + +bthread_once_t::bthread_once_t() + : _butex(bthread::butex_create_checked>()) { + _butex->store(UNINITIALIZED, butil::memory_order_relaxed); +} + +bthread_once_t::~bthread_once_t() { + bthread::butex_destroy(_butex); +} + +namespace bthread { + +int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()) { + butil::atomic* butex = once_control->_butex; + // We need acquire memory order for this load because if the value + // signals that initialization has finished, we need to see any + // data modifications done during initialization. + int val = butex->load(butil::memory_order_acquire); + if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) { + // The initialization has already been done. + return 0; + } + val = bthread_once_t::UNINITIALIZED; + if (butex->compare_exchange_strong(val, bthread_once_t::INPROGRESS, + butil::memory_order_relaxed, + butil::memory_order_relaxed)) { + // This (b)thread is the first and the Only one here. Do the initialization. + init_routine(); + // Mark *once_control as having finished the initialization. We need + // release memory order here because we need to synchronize with other + // (b)threads that want to use the initialized data. + butex->store(bthread_once_t::INITIALIZED, butil::memory_order_release); + // Wake up all other (b)threads. + bthread::butex_wake_all(butex); + return 0; + } + + while (true) { + // Same as above, we need acquire memory order. + val = butex->load(butil::memory_order_acquire); + if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) { + // The initialization has already been done. + return 0; + } + // Unless your constructor can be very time consuming, it is very unlikely o hit + // this race. When it does, we just wait the thread until the object has been created. + if (bthread::butex_wait(butex, val, NULL) < 0 && + errno != EWOULDBLOCK && errno != EINTR/*note*/) { + return errno; + } + } +} + +} // namespace bthread + +__BEGIN_DECLS + +int bthread_once(bthread_once_t* once_control, void (*init_routine)()) { + return bthread::bthread_once_impl(once_control, init_routine); +} + +__END_DECLS \ No newline at end of file diff --git a/src/bthread/singleton_on_bthread_once.h b/src/bthread/singleton_on_bthread_once.h new file mode 100644 index 0000000000..9ea507d788 --- /dev/null +++ b/src/bthread/singleton_on_bthread_once.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_SINGLETON_ON_BTHREAD_ONCE_H +#define BRPC_SINGLETON_ON_BTHREAD_ONCE_H + +#include "bthread/bthread.h" + +namespace bthread { + +template +class GetLeakySingleton { +public: + static T* _instance; + static bthread_once_t* g_create_leaky_singleton_once; + static void create_leaky_singleton(); +}; + +template +T* GetLeakySingleton::_instance = NULL; + +template +bthread_once_t* GetLeakySingleton::g_create_leaky_singleton_once + = new bthread_once_t; + +template +void GetLeakySingleton::create_leaky_singleton() { + _instance = new T; +} + +// To get a never-deleted singleton of a type T, just call +// bthread::get_leaky_singleton(). Most daemon (b)threads +// or objects that need to be always-on can be created by +// this function. This function can be called safely not only +// before main() w/o initialization issues of global variables, +// but also on bthread with hanging operation. +template +inline T* get_leaky_singleton() { + using LeakySingleton = GetLeakySingleton; + bthread_once(LeakySingleton::g_create_leaky_singleton_once, + LeakySingleton::create_leaky_singleton); + return LeakySingleton::_instance; +} + +} // namespace bthread + +#endif // BRPC_SINGLETON_ON_BTHREAD_ONCE_H diff --git a/src/bthread/types.h b/src/bthread/types.h index cb39ae3c9d..d91b85aab3 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -192,6 +192,29 @@ typedef struct { typedef struct { } bthread_barrierattr_t; +#if defined(__cplusplus) +class bthread_once_t; +namespace bthread { +extern int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()); +} + +class bthread_once_t { +public: +friend int bthread::bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()); + enum State { + UNINITIALIZED = 0, + INPROGRESS, + INITIALIZED, + }; + + bthread_once_t(); + ~bthread_once_t(); + +private: + butil::atomic* _butex; +}; +#endif + typedef struct { uint64_t value; } bthread_id_t; diff --git a/test/bthread_once_unittest.cpp b/test/bthread_once_unittest.cpp new file mode 100644 index 0000000000..618798e8c4 --- /dev/null +++ b/test/bthread_once_unittest.cpp @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include "bthread/bthread.h" +#include "bthread/singleton_on_bthread_once.h" +#include "bthread/task_control.h" + +namespace bthread { +extern TaskControl* g_task_control; +} + +namespace { + +bthread_once_t g_bthread_once_control; +bool g_bthread_once_started = false; +butil::atomic g_bthread_once_count(0); + +void init_routine() { + bthread_usleep(2000 * 1000); + g_bthread_once_count.fetch_add(1, butil::memory_order_relaxed); +} + +void bthread_once_task() { + bthread_once(&g_bthread_once_control, init_routine); + // `init_routine' only be called once. + ASSERT_EQ(1, g_bthread_once_count.load(butil::memory_order_relaxed)); +} + +void* first_bthread_once_task(void*) { + g_bthread_once_started = true; + bthread_once_task(); + return NULL; +} + + +void* other_bthread_once_task(void*) { + bthread_once_task(); + return NULL; +} + +TEST(BthreadOnceTest, once) { + bthread_t bid; + ASSERT_EQ(0, bthread_start_background( + &bid, NULL, first_bthread_once_task, NULL)); + while (!g_bthread_once_started) { + bthread_usleep(1000); + } + ASSERT_NE(nullptr, bthread::g_task_control); + int concurrency = bthread::g_task_control->concurrency(); + LOG(INFO) << "concurrency: " << concurrency; + ASSERT_GT(concurrency, 0); + std::vector bids(concurrency * 100); + for (auto& id : bids) { + ASSERT_EQ(0, bthread_start_background( + &id, NULL, other_bthread_once_task, NULL)); + } + bthread_once_task(); + + for (auto& id : bids) { + bthread_join(id, NULL); + } + bthread_join(bid, NULL); +} + +bool g_bthread_started = false; +butil::atomic g_bthread_singleton_count(0); + +class BthreadSingleton { +public: + BthreadSingleton() { + bthread_usleep(2000 * 1000); + g_bthread_singleton_count.fetch_add(1, butil::memory_order_relaxed); + } +}; + +void get_bthread_singleton() { + auto instance = bthread::get_leaky_singleton(); + ASSERT_NE(nullptr, instance); + // Only one BthreadSingleton instance has been created. + ASSERT_EQ(1, g_bthread_singleton_count.load(butil::memory_order_relaxed)); +} + +void* first_get_bthread_singleton(void*) { + g_bthread_started = true; + get_bthread_singleton(); + return NULL; +} + + +void* get_bthread_singleton(void*) { + get_bthread_singleton(); + return NULL; +} + +// Singleton will definitely not cause deadlock, +// even if constructor of T will hang the bthread. +TEST(BthreadOnceTest, singleton) { + bthread_t bid; + ASSERT_EQ(0, bthread_start_background( + &bid, NULL, first_get_bthread_singleton, NULL)); + while (!g_bthread_started) { + bthread_usleep(1000); + } + ASSERT_NE(nullptr, bthread::g_task_control); + int concurrency = bthread::g_task_control->concurrency(); + LOG(INFO) << "concurrency: " << concurrency; + ASSERT_GT(concurrency, 0); + std::vector bids(concurrency * 100); + for (auto& id : bids) { + ASSERT_EQ(0, bthread_start_background( + &id, NULL, get_bthread_singleton, NULL)); + } + get_bthread_singleton(); + + for (auto& id : bids) { + bthread_join(id, NULL); + } + bthread_join(bid, NULL); +} + +} \ No newline at end of file