diff --git a/src/groups/bmq/bmqu/bmqu_managedcallback.cpp b/src/groups/bmq/bmqu/bmqu_managedcallback.cpp new file mode 100644 index 0000000000..7da548d949 --- /dev/null +++ b/src/groups/bmq/bmqu/bmqu_managedcallback.cpp @@ -0,0 +1,86 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// bmqu_managedcallback.cpp -*-C++-*- +#include + +#include +namespace BloombergLP { +namespace bmqu { + +namespace { + +class VoidCallback : public bmqu::ManagedCallback::CallbackFunctor { + private: + // PRIVATE DATA + bmqu::ManagedCallback::VoidFunctor d_callback; + + public: + // CREATORS + explicit VoidCallback(const bmqu::ManagedCallback::VoidFunctor& callback) + : d_callback(callback) + { + // NOTHING + } + + explicit VoidCallback( + bslmf::MovableRef callback) + : d_callback(bslmf::MovableRefUtil::move(callback)) + { + // NOTHING + } + + ~VoidCallback() BSLS_KEYWORD_OVERRIDE + { + // NOTHING + } + + // ACCESSORS + void operator()() const BSLS_KEYWORD_OVERRIDE + { + if (d_callback) { + d_callback(); + } + } +}; + +} // close unnamed namespace + +// --------------------------------------- +// struct ManagedCallback::CallbackFunctor +// --------------------------------------- + +ManagedCallback::CallbackFunctor::~CallbackFunctor() +{ + // NOTHING +} + +void ManagedCallback::set(const VoidFunctor& callback) +{ + // Preconditions for placement are checked in `place`. + // Destructor is called by `reset` of the holding DispatcherEvent. + new (place()) VoidCallback(callback); +} + +void ManagedCallback::set(bslmf::MovableRef callback) +{ + // Preconditions for placement are checked in `place`. + // Destructor is called by `reset` of the holding DispatcherEvent. + new (place()) + VoidCallback(bslmf::MovableRefUtil::move(callback)); +} + +} // close package namespace +} // close enterprise namespace diff --git a/src/groups/bmq/bmqu/bmqu_managedcallback.h b/src/groups/bmq/bmqu/bmqu_managedcallback.h new file mode 100644 index 0000000000..f9301f45e9 --- /dev/null +++ b/src/groups/bmq/bmqu/bmqu_managedcallback.h @@ -0,0 +1,201 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// bmqu_managedcallback.h -*-C++-*- +#ifndef INCLUDED_BMQU_JSONPRINTER +#define INCLUDED_BMQU_JSONPRINTER + +//@PURPOSE: Provide a mechanism to print key-value pairs in JSON format. +// +//@CLASSES: +// bmqu::JsonPrinter: Mechanism to print key-value pairs in JSON format. +// +//@DESCRIPTION: 'bmqu::JsonPrinter' provides a mechanism to print key-value +// pairs in JSON format. +// +/// Usage +///----- +// First, specify field names for printer: +//.. +// bsl::vector fields; +// fields.push_back("Queue URI"); +// fields.push_back("QueueKey"); +// fields.push_back("Number of AppIds"); +//.. +// +// Next, create an instance of bmqu::AlignedPrinter: +//.. +// bsl::stringstream output; +// bmqu::JsonPrinter printer(output, &fields); +//.. +// +// Last, print field values accordingly: +//.. +// bsl::string uri = "bmq://bmq.tutorial.workqueue/sample-queue"; +// bsl::string queueKey = "sample"; +// const int num = 1; +// printer << uri << queueKey << num; +//.. +// + +// BDE +#include +#include + +#include +#include + +namespace BloombergLP { +namespace bmqu { + +// =============== +// ManagedCallback +// =============== + +class ManagedCallback BSLS_KEYWORD_FINAL { + /// The class useful for in-place construction and passing of functors + /// between different actors. + private: + // DATA + /// Reusable buffer holding the stored callback. + bsl::vector d_callbackBuffer; + + /// The flag indicating if `d_callbackBuffer` is empty now. + bool d_empty; + + public: + // TRAITS + BSLMF_NESTED_TRAIT_DECLARATION(ManagedCallback, bslma::UsesBslmaAllocator) + + // PUBLIC TYPES + /// Signature of a `void` functor method. + typedef bsl::function VoidFunctor; + + // =============== + // CallbackFunctor + // =============== + /// The interface for all callback functors passed to ManagedCallback. + struct CallbackFunctor { + // CREATORS + virtual ~CallbackFunctor(); + + // ACCESSORS + virtual void operator()() const = 0; + }; + + // CREATORS + /// Create a ManagedCallback object using the optionally specified + /// `allocator`. + explicit ManagedCallback(bslma::Allocator* allocator = 0); + + /// Destroy this object. + ~ManagedCallback(); + + // MANIPULATORS + /// Reset the state of this object. If this object stores a callback, + /// call a destructor for it and set this object empty. + void reset(); + + /// Book and return the allocated memory to store the specified + /// `CALLBACK_TYPE` that has to be inherited from `CallbackFunctor`. + /// Note that it's the user's responsibility to construct a functor object + /// at the provided address. + /// The object must be `empty()` before calling `place()`. + template + char* place(); + + /// Store the specified `callback` in this object. + /// Note: these setters are slow because they performs function copy, going + /// against the basic idea of ManagedCallback. They exist solely for + /// the compatibility with other code where this copy is acceptable. + /// In performance-critical paths, `place` should always be used + /// together with reusable ManagedCallback object(s). + void set(const VoidFunctor& callback); + void set(bslmf::MovableRef callback); + + // ACCESSORS + /// Is this object empty or not. + bool empty() const; + + /// Call the stored callback. + /// The object must be `!empty()` before calling `operator()`. + void operator()() const; +}; + +// ============================================================================ +// INLINE DEFINITIONS +// ============================================================================ + +// --------------- +// ManagedCallback +// --------------- + +inline ManagedCallback::ManagedCallback(bslma::Allocator* allocator) +: d_callbackBuffer(allocator) +, d_empty(true) +{ + // NOTHING +} + +inline ManagedCallback::~ManagedCallback() +{ + reset(); +} + +inline void ManagedCallback::reset() +{ + if (!d_empty) { + // Not necessary to resize the vector or memset its elements to 0, + // we just call the virtual destructor, and `d_empty` flag + // prevents us from calling outdated callback. + reinterpret_cast(d_callbackBuffer.data()) + ->~CallbackFunctor(); + d_empty = true; + } +} + +template +inline char* ManagedCallback::place() +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_empty); + /// The compilation will fail here on the outer `static_cast` if we + /// don't provide a type that is inherited from the base + /// `CallbackFunctor` type. + /// TODO: replace by static_assert on C++ standard update + BSLS_ASSERT_SAFE(0 == static_cast( + reinterpret_cast(0))); + d_callbackBuffer.resize(sizeof(CALLBACK_TYPE)); + d_empty = false; + return d_callbackBuffer.data(); +} + +inline bool ManagedCallback::empty() const +{ + return d_empty; +} + +inline void ManagedCallback::operator()() const +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(!d_empty); + + (*reinterpret_cast(d_callbackBuffer.data()))(); +} + +} // close package namespace +} // close enterprise namespace + +#endif diff --git a/src/groups/bmq/bmqu/bmqu_managedcallback.t.cpp b/src/groups/bmq/bmqu/bmqu_managedcallback.t.cpp new file mode 100644 index 0000000000..f4160539b7 --- /dev/null +++ b/src/groups/bmq/bmqu/bmqu_managedcallback.t.cpp @@ -0,0 +1,392 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// bmqu_managedcallback.t.cpp -*-C++-*- +#include + +// BDE +#include +#include +#include +#include +#include +#include +#include + +// BMQ +#include + +// TEST DRIVER +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; + +// ============================================================================ +// TEST HELPERS UTILITY +// ---------------------------------------------------------------------------- +namespace { + +struct Test { + int ptr; + int a; + bool b; +}; + +void increment(size_t* calls) +{ + (*calls)++; +} + +struct Dummy : bmqu::ManagedCallback::CallbackFunctor { + char data[144]; + + void operator()() const BSLS_KEYWORD_OVERRIDE {} +}; + +static void +printSummary(const bsl::string& desc, bsls::Types::Int64 dt, size_t iters) +{ + bsl::cout << desc << ":" << bsl::endl; + bsl::cout << " total: " << bmqu::PrintUtil::prettyTimeInterval(dt) + << " (" << iters << " iterations)" << bsl::endl; + bsl::cout << " per call: " + << bmqu::PrintUtil::prettyTimeInterval((dt) / iters) + << bsl::endl; + bsl::cout << bsl::endl; +} + +struct StatsCallback : bmqu::ManagedCallback::CallbackFunctor { + size_t& d_constructors; + size_t& d_destructors; + size_t& d_calls; + + StatsCallback(size_t& constructors, size_t& destructors, size_t& calls) + : d_constructors(constructors) + , d_destructors(destructors) + , d_calls(calls) + { + d_constructors++; + } + + ~StatsCallback() BSLS_KEYWORD_OVERRIDE { d_destructors++; } + + void operator()() const BSLS_KEYWORD_OVERRIDE { d_calls++; } +}; + +class ManagedCallbackBuffer BSLS_KEYWORD_FINAL { + private: + // DATA + /// Reusable buffer holding the stored callback. + char d_callbackBuffer[256]; + + /// The flag indicating if `d_callbackBuffer` is empty now. + bool d_empty; + + public: + // TRAITS + BSLMF_NESTED_TRAIT_DECLARATION(ManagedCallbackBuffer, + bslma::UsesBslmaAllocator) + + // CREATORS + inline explicit ManagedCallbackBuffer(bslma::Allocator* allocator = 0) + : d_callbackBuffer() + , d_empty(true) + { + // NOTHING + } + + inline ~ManagedCallbackBuffer() { reset(); } + + // MANIPULATORS + inline void reset() + { + if (!d_empty) { + // Not necessary to resize the vector or memset its elements to 0, + // we just call the virtual destructor, and `d_empty` flag + // prevents us from calling outdated callback. + reinterpret_cast( + d_callbackBuffer) + ->~CallbackFunctor(); + d_empty = true; + } + } + + template + inline char* place() + { + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_empty); + /// The compilation will fail here on the outer `static_cast` if we + /// don't provide a type that is inherited from the base + /// `CallbackFunctor` type. + /// TODO: replace by static_assert on C++ standard update + BSLS_ASSERT_SAFE( + 0 == + static_cast( + reinterpret_cast(0))); + d_empty = false; + return d_callbackBuffer; + } + + // ACCESSORS + + inline bool empty() const { return d_empty; } + + inline void operator()() const + { + // PRECONDITIONS + BSLS_ASSERT_SAFE(!d_empty); + + (*reinterpret_cast( + d_callbackBuffer))(); + } +}; + +} // close unnamed namespace + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- +static void test1_ManagedCallback() +{ + bmqtst::TestHelper::printTestName("MANAGED CALLBACK"); + + { + bmqu::ManagedCallback callback(bmqtst::TestHelperUtil::allocator()); + BMQTST_ASSERT(callback.empty()); + + callback.set(bmqu::ManagedCallback::VoidFunctor()); + BMQTST_ASSERT(!callback.empty()); + callback(); + } +} + +static void testN1_managedCallbackPerformance() +{ + const size_t k_ITERS_NUM = 10000000; + + size_t constructors = 0; + size_t destructors = 0; + size_t calls = 0; + + // Warmup + for (size_t i = 0; i < k_ITERS_NUM; i++) { + bmqu::ManagedCallback callback(bmqtst::TestHelperUtil::allocator()); + new (callback.place()) + StatsCallback(constructors, destructors, calls); + callback.reset(); + } + + bsl::cout << "========= Benchmark 1: function call ==========" << bsl::endl + << "Build a functor once and call it multiple times" << bsl::endl + << "===============================================" << bsl::endl + << bsl::endl; + { + bsl::function callback; + callback = bdlf::BindUtil::bind(increment, &calls); + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + callback(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bsl::function<...>()", end - begin, k_ITERS_NUM); + } + { + bmqu::ManagedCallback callback(bmqtst::TestHelperUtil::allocator()); + new (callback.place()) + StatsCallback(constructors, destructors, calls); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + callback(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bmqu::ManagedCallback(vector)", + end - begin, + k_ITERS_NUM); + } + { + ManagedCallbackBuffer callback(bmqtst::TestHelperUtil::allocator()); + new (callback.place()) + StatsCallback(constructors, destructors, calls); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + callback(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bmqu::ManagedCallback(char[])", + end - begin, + k_ITERS_NUM); + } + + bsl::cout << "========= Benchmark 2: new construct ==========" << bsl::endl + << "Build a functor multiple times without calling" << bsl::endl + << "===============================================" << bsl::endl + << bsl::endl; + { + bsl::vector > cbs; + cbs.resize(k_ITERS_NUM); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + cbs[i] = bdlf::BindUtil::bindS(bmqtst::TestHelperUtil::allocator(), + increment, + &calls); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + cbs.clear(); + cbs.shrink_to_fit(); + + printSummary("bsl::function<...>()", end - begin, k_ITERS_NUM); + } + { + bsl::vector cbs; + cbs.resize(k_ITERS_NUM); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + cbs[i] = new bmqu::ManagedCallback( + bmqtst::TestHelperUtil::allocator()); + new (cbs[i]->place()) + StatsCallback(constructors, destructors, calls); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + for (size_t i = 0; i < k_ITERS_NUM; i++) { + cbs[i]->reset(); + delete cbs[i]; + } + + cbs.clear(); + cbs.shrink_to_fit(); + + printSummary("bmqu::ManagedCallback(vector)", + end - begin, + k_ITERS_NUM); + } + { + bsl::vector cbs; + cbs.resize(k_ITERS_NUM); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + cbs[i] = new ManagedCallbackBuffer( + bmqtst::TestHelperUtil::allocator()); + new (cbs[i]->place()) + StatsCallback(constructors, destructors, calls); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + for (size_t i = 0; i < k_ITERS_NUM; i++) { + cbs[i]->reset(); + delete cbs[i]; + } + + cbs.clear(); + cbs.shrink_to_fit(); + + printSummary("bmqu::ManagedCallback(char[])", + end - begin, + k_ITERS_NUM); + } + + bsl::cout << "========= Benchmark 3: reuse functor ==========" << bsl::endl + << "Reset and call a functor multiple times" << bsl::endl + << "===============================================" << bsl::endl + << bsl::endl; + { + bsl::function callback; + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + callback = bdlf::BindUtil::bindS( + bmqtst::TestHelperUtil::allocator(), + increment, + &calls); + callback(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bsl::function<...>()", end - begin, k_ITERS_NUM); + } + { + bmqu::ManagedCallback callback(bmqtst::TestHelperUtil::allocator()); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + new (callback.place()) + StatsCallback(constructors, destructors, calls); + callback(); + callback.reset(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bmqu::ManagedCallback(vector)", + end - begin, + k_ITERS_NUM); + } + { + ManagedCallbackBuffer callback(bmqtst::TestHelperUtil::allocator()); + + const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer(); + for (size_t i = 0; i < k_ITERS_NUM; i++) { + new (callback.place()) + StatsCallback(constructors, destructors, calls); + callback(); + callback.reset(); + } + const bsls::Types::Int64 end = bsls::TimeUtil::getTimer(); + + printSummary("bmqu::ManagedCallback(char[])", + end - begin, + k_ITERS_NUM); + } + + if (calls < 100000) { + bsl::cout << calls << bsl::endl; + } +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + // To be called only once per process instantiation. + bsls::TimeUtil::initialize(); + + TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT); + + switch (_testCase) { + case 0: + case 1: test1_ManagedCallback(); break; + case -1: testN1_managedCallbackPerformance(); break; + default: { + bsl::cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." + << bsl::endl; + bmqtst::TestHelperUtil::testStatus() = -1; + } break; + } + + TEST_EPILOG(bmqtst::TestHelper::e_CHECK_DEF_GBL_ALLOC); +} diff --git a/src/groups/bmq/bmqu/package/bmqu.mem b/src/groups/bmq/bmqu/package/bmqu.mem index f295876bb1..f7c5f8ed49 100644 --- a/src/groups/bmq/bmqu/package/bmqu.mem +++ b/src/groups/bmq/bmqu/package/bmqu.mem @@ -5,6 +5,7 @@ bmqu_blob bmqu_blobiterator bmqu_blobobjectproxy bmqu_jsonprinter +bmqu_managedcallback bmqu_memoutstream bmqu_objectplaceholder bmqu_objectplaceholder_cpp03 diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 364f4d1930..0eb3eb8911 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -457,8 +457,8 @@ void AdminSession::onDispatcherEvent(const mqbi::DispatcherEvent& event) const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_ACK: diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 86a326ca02..25c4a46dde 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -746,7 +746,7 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore, // with the 'tearDownAllQueuesDone' finalize callback having the 'handle' // bound to it (so that the session is not yet destroyed). dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind(&ClientSession::tearDownAllQueuesDone, this, @@ -1272,7 +1272,7 @@ void ClientSession::processDisconnectAllQueues( // type, refer to top level documention for explanation (paragraph about // the bmqu::SharedResource). dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind( bmqu::WeakMemFnUtil::weakMemFn( @@ -3072,9 +3072,9 @@ void ClientSession::onDispatcherEvent(const mqbi::DispatcherEvent& event) const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); flush(); // Flush any pending messages to guarantee ordering of events - realEvent->callback()(dispatcherClientData().processorHandle()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_CONTROL_MSG: { BSLS_ASSERT_OPT(false && diff --git a/src/groups/mqb/mqba/mqba_dispatcher.cpp b/src/groups/mqb/mqba/mqba_dispatcher.cpp index 4032047ff8..a7700fe02d 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.cpp +++ b/src/groups/mqb/mqba/mqba_dispatcher.cpp @@ -91,7 +91,8 @@ void Dispatcher_Executor::post(const bsl::function& f) const event->object() .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f)); + .callback() + .set(f); // submit the event int rc = d_processorPool_p->enqueueEvent(event, d_processorHandle); @@ -183,8 +184,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function& f) const event->object() .setType(mqbi::DispatcherEventType::e_CALLBACK) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f)) - .setDestination(const_cast(d_client_p)); + .setDestination(const_cast(d_client_p)) + .callback() + .set(f); // submit the event int rc = processorPool()->enqueueEvent(event, processorHandle()); @@ -232,6 +234,39 @@ Dispatcher::DispatcherContext::DispatcherContext( // NOTHING } +// ------------------------------------ +// class Dispatcher::OnNewClientFunctor +// ------------------------------------ + +Dispatcher::OnNewClientFunctor::OnNewClientFunctor( + Dispatcher* owner_p, + mqbi::DispatcherClientType::Enum type, + int processorId) +: d_owner_p(owner_p) +, d_type(type) +, d_processorId(processorId) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_owner_p); +} + +// ACCESSORS +void Dispatcher::OnNewClientFunctor::operator()() const +{ + // executed by the *DISPATCHER* thread + + // Resize the 'd_flushList' vector for that specified 'processorId', if + // needed, to ensure it has enough space to hold all clients associated to + // that processorId. + DispatcherContext& context = *(d_owner_p->d_contexts[d_type]); + + int count = context.d_loadBalancer.clientsCountForProcessor(d_processorId); + if (static_cast(context.d_flushList[d_processorId].capacity()) < + count) { + context.d_flushList[d_processorId].reserve(count); + } +} + // ---------------- // class Dispatcher // ---------------- @@ -372,10 +407,10 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, // whole purpose of the 'e_DISPATCHER' event type. flushClients(type, processorId); - if (realEvent->callback()) { + if (!realEvent->callback().empty()) { // A callback may not have been set if all we wanted was to // execute the 'finalizeCallback' of the event. - realEvent->callback()(processorId); + realEvent->callback()(); } } else { @@ -404,7 +439,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type, const mqbi::DispatcherDispatcherEvent* realEvent = event->object().asDispatcherEvent(); - if (realEvent->finalizeCallback()) { + if (!realEvent->finalizeCallback().empty()) { BALL_LOG_TRACE << "Calling finalizeCallback on queue " << processorId << " of " << type << " dispatcher: " << event->object(); @@ -430,23 +465,6 @@ void Dispatcher::flushClients(mqbi::DispatcherClientType::Enum type, context.d_flushList[processorId].clear(); } -void Dispatcher::onNewClient(mqbi::DispatcherClientType::Enum type, - int processorId) -{ - // executed by the *DISPATCHER* thread - - // Resize the 'd_flushList' vector for that specified 'processorId', if - // needed, to ensure it has enough space to hold all clients associated to - // that processorId. - DispatcherContext& context = *(d_contexts[type]); - - int count = context.d_loadBalancer.clientsCountForProcessor(processorId); - if (static_cast(context.d_flushList[processorId].capacity()) < - count) { - context.d_flushList[processorId].reserve(count); - } -} - Dispatcher::Dispatcher(const mqbcfg::DispatcherConfig& config, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator) @@ -595,12 +613,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client, &context.d_processorPool_mp->getUnmanagedEvent()->object(); (*event) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback( - bdlf::BindUtil::bind(&Dispatcher::onNewClient, - this, - type, - bdlf::PlaceHolders::_1)) // processor - .setDestination(client); // not needed + .setDestination(client); // TODO: not needed? + + // Build callback functor in-place. + // The destructor for functor is called in `reset`. + new (event->callback().place()) + OnNewClientFunctor(this, type, processor); + context.d_processorPool_mp->enqueueEvent(event, processor); return processor; // RETURN } // break; @@ -650,8 +669,8 @@ void Dispatcher::unregisterClient(mqbi::DispatcherClient* client) mqbi::Dispatcher::k_INVALID_PROCESSOR_HANDLE); } -void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, +void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, const mqbi::Dispatcher::VoidFunctor& doneCallback) { // PRECONDITIONS @@ -691,9 +710,9 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor, if (processorPool[i] != 0) { mqbi::DispatcherEvent* qEvent = &processorPool[i]->getUnmanagedEvent()->object(); - qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(functor) - .setFinalizeCallback(doneCallback); + qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER); + qEvent->callback().set(functor); + qEvent->finalizeCallback().set(doneCallback); processorPool[i]->enqueueEventOnAllQueues(qEvent); } } diff --git a/src/groups/mqb/mqba/mqba_dispatcher.h b/src/groups/mqb/mqba/mqba_dispatcher.h index b15f8a3076..f538d0e1a5 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.h +++ b/src/groups/mqb/mqba/mqba_dispatcher.h @@ -221,6 +221,31 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { typedef bsl::vector DispatcherClientPtrVector; + /// The purpose is to avoid memory allocation by bdlf::BindUtil::bind + /// when dispatching CONFIRM from Cluster to Queue. + class OnNewClientFunctor : public bmqu::ManagedCallback::CallbackFunctor { + private: + // PRIVATE DATA + Dispatcher* d_owner_p; + mqbi::DispatcherClientType::Enum d_type; + int d_processorId; + + public: + // CREATORS + /// This functor is invoked when a new client with the specified `type` + /// is registered to the dispatcher, from the thread associated to that + /// new client that is mapped to the specified `processorId`. The + /// specified `owner_p` holds pointer to the parent Dispatcher object. + OnNewClientFunctor(Dispatcher* owner_p, + mqbi::DispatcherClientType::Enum type, + int processorId); + + // ACCESSORS + /// Updated the data associated with the new client from the + /// appropriate thread, using fields stored in this functor. + void operator()() const BSLS_KEYWORD_OVERRIDE; + }; + /// Context for a dispatcher, with threads and pools struct DispatcherContext { private: @@ -325,11 +350,6 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { /// `processorId`. void flushClients(mqbi::DispatcherClientType::Enum type, int processorId); - /// This method is invoked when a new client of the specified `type` is - /// registered to the dispatcher, from the thread associated with that new - /// client that is mapped to the specified `processorId`. - void onNewClient(mqbi::DispatcherClientType::Enum type, int processorId); - public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(Dispatcher, bslma::UsesBslmaAllocator) @@ -408,9 +428,9 @@ class Dispatcher BSLS_CPP11_FINAL : public mqbi::Dispatcher { /// clients of the specified `type`, and invoke the optionally specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - void execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, - const mqbi::Dispatcher::VoidFunctor& doneCallback = + void execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, + const mqbi::Dispatcher::VoidFunctor& doneCallback = mqbi::Dispatcher::VoidFunctor()) BSLS_KEYWORD_OVERRIDE; /// Execute the specified `functor`, using the specified dispatcher `type`, @@ -566,8 +586,7 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, mqbi::DispatcherEvent* event = getEvent(client); - (*event).setType(type).setCallback( - mqbi::Dispatcher::voidToProcessorFunctor(functor)); + (*event).setType(type).callback().set(functor); dispatchEvent(event, client); } @@ -582,7 +601,8 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor, (*event) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor)); + .callback() + .set(functor); dispatchEvent(event, client.clientType(), client.processorHandle()); } diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index d8915ca4ae..d545f766c5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3403,8 +3403,8 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent& realEvent = *event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent.callback()); - realEvent.callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent.callback().empty()); + realEvent.callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_PUT: { const mqbi::DispatcherPutEvent& realEvent = *event.asPutEvent(); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 86f69287e7..b8e051705c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1398,8 +1398,8 @@ void ClusterProxy::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_PUSH: { onPushEvent(*(event.asPushEvent())); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index f6d39443cc..cca0887bb2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -5790,7 +5790,7 @@ void ClusterQueueHelper::checkUnconfirmedV2Dispatched( // Synchronize with all Queue Dispatcher threads bslmt::Latch latch(1); d_cluster_p->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::Dispatcher::VoidFunctor(), // empty mqbi::DispatcherClientType::e_QUEUE, bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch)); diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index 3e598733ad..3e85f040be 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -334,9 +334,8 @@ void LocalQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()( - d_state_p->queue()->dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); + realEvent->callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_ACK: { BALL_LOG_INFO << "Skipping dispatcher event [" << event << "] " diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 69c6412b47..6c4f335c3d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -606,12 +606,12 @@ void QueueEngineUtil_ReleaseHandleProctor::invokeCallback() // represents 'mqbblp::ClusterNodeSession'). d_queueState_p->queue()->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), + mqbi::Dispatcher::VoidFunctor(), mqbi::DispatcherClientType::e_SESSION, bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp)); d_queueState_p->queue()->dispatcher()->execute( - mqbi::Dispatcher::ProcessorFunctor(), + mqbi::Dispatcher::VoidFunctor(), mqbi::DispatcherClientType::e_CLUSTER, bdlf::BindUtil::bind(&queueHandleHolderDummy, d_handleSp)); } diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index f3a240ad9b..fc6b2dc733 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -766,9 +766,15 @@ void QueueHandle::confirmMessage(const bmqt::MessageGUID& msgGUID, // A more generic approach would be to maintain a queue of CONFIRMs per // queue (outside of the dispatcher) and process it separately (on idle?). - QueueHandle::ConfirmFunctor f(this, msgGUID, downstreamSubQueueId); + mqbi::DispatcherEvent* queueEvent = d_queue_sp->dispatcher()->getEvent( + mqbi::DispatcherClientType::e_QUEUE); - d_queue_sp->dispatcher()->execute(f, d_queue_sp.get()); + (*queueEvent).setType(mqbi::DispatcherEventType::e_CALLBACK); + + new (queueEvent->callback().place()) + QueueHandle::ConfirmFunctor(this, msgGUID, downstreamSubQueueId); + + d_queue_sp->dispatcher()->dispatchEvent(queueEvent, d_queue_sp.get()); } void QueueHandle::rejectMessage(const bmqt::MessageGUID& msgGUID, diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.h b/src/groups/mqb/mqbblp/mqbblp_queuehandle.h index 0514fe7cae..dcec9f5362 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.h @@ -126,7 +126,7 @@ class QueueHandle : public mqbi::QueueHandle { /// The purpose is to avoid memory allocation by bdlf::BindUtil::bind /// when dispatching CONFIRM from Cluster to Queue. - class ConfirmFunctor { + class ConfirmFunctor : public bmqu::ManagedCallback::CallbackFunctor { private: // PRIVATE DATA QueueHandle* d_owner_p; @@ -135,11 +135,11 @@ class QueueHandle : public mqbi::QueueHandle { public: // CREATORS - ConfirmFunctor(QueueHandle* owner_p, - bmqt::MessageGUID guid, - unsigned int downstreamSubQueueId); + explicit ConfirmFunctor(QueueHandle* owner_p, + bmqt::MessageGUID guid, + unsigned int downstreamSubQueueId); - void operator()(); + void operator()() const BSLS_KEYWORD_OVERRIDE; }; public: @@ -670,7 +670,7 @@ inline QueueHandle::ConfirmFunctor::ConfirmFunctor( // NOTHING } -inline void QueueHandle::ConfirmFunctor::operator()() +inline void QueueHandle::ConfirmFunctor::operator()() const { d_owner_p->confirmMessageDispatched(d_guid, d_downstreamSubQueueId); } diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index f0f92a40d7..9dacb084f0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -784,9 +784,8 @@ void RemoteQueue::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()( - d_state_p->queue()->dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); + realEvent->callback()(); } break; case mqbi::DispatcherEventType::e_PUSH: { const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent(); diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index f6174cf601..a5c59f9707 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -1059,7 +1059,6 @@ void StorageManager::registerQueue(const bmqt::Uri& uri, &d_storagesLock, d_fileStores[partitionId].get(), &d_allocators, - processorForPartition(partitionId), uri, queueKey, d_clusterData_p->identity().description(), @@ -1082,9 +1081,9 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback( + .callback() + .set( bdlf::BindUtil::bind(&mqbc::StorageUtil::unregisterQueueDispatched, - bdlf::PlaceHolders::_1, // processor d_fileStores[partitionId].get(), &d_storages[partitionId], &d_storagesLock, @@ -1148,7 +1147,8 @@ void StorageManager::registerQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(bdlf::BindUtil::bind( + .callback() + .set(bdlf::BindUtil::bind( &mqbc::StorageUtil::registerQueueReplicaDispatched, static_cast(0), &d_storages[partitionId], @@ -1192,7 +1192,8 @@ void StorageManager::unregisterQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(bdlf::BindUtil::bind( + .callback() + .set(bdlf::BindUtil::bind( &mqbc::StorageUtil::unregisterQueueReplicaDispatched, static_cast(0), &d_storages[partitionId], @@ -1236,7 +1237,8 @@ void StorageManager::updateQueueReplica(int partitionId, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback(bdlf::BindUtil::bind( + .callback() + .set(bdlf::BindUtil::bind( &mqbc::StorageUtil::updateQueueReplicaDispatched, static_cast(0), &d_storages[partitionId], @@ -1274,15 +1276,14 @@ void StorageManager::setQueue(mqbi::Queue* queue, (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) - .setCallback( - bdlf::BindUtil::bind(&mqbc::StorageUtil::setQueueDispatched, - &d_storages[partitionId], - &d_storagesLock, - bdlf::PlaceHolders::_1, // processor - d_clusterData_p->identity().description(), - partitionId, - uri, - queue)); + .callback() + .set(bdlf::BindUtil::bind(&mqbc::StorageUtil::setQueueDispatched, + &d_storages[partitionId], + &d_storagesLock, + d_clusterData_p->identity().description(), + partitionId, + uri, + queue)); d_fileStores[partitionId]->dispatchEvent(queueEvent); ; @@ -1301,7 +1302,6 @@ void StorageManager::setQueueRaw(mqbi::Queue* queue, mqbc::StorageUtil::setQueueDispatched( &d_storages[partitionId], &d_storagesLock, - processorForPartition(partitionId), d_clusterData_p->identity().description(), partitionId, uri, diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 7be6c15fcd..1d79b603d8 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3713,7 +3713,6 @@ void StorageManager::registerQueue(const bmqt::Uri& uri, &d_storagesLock, d_fileStores[partitionId].get(), &d_allocators, - processorForPartition(partitionId), uri, queueKey, d_clusterData_p->identity().description(), @@ -3738,7 +3737,6 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback( bdlf::BindUtil::bind(&StorageUtil::unregisterQueueDispatched, - bdlf::PlaceHolders::_1, // processor d_fileStores[partitionId].get(), &d_storages[partitionId], &d_storagesLock, @@ -3906,7 +3904,6 @@ void StorageManager::setQueue(mqbi::Queue* queue, bdlf::BindUtil::bind(&StorageUtil::setQueueDispatched, &d_storages[partitionId], &d_storagesLock, - bdlf::PlaceHolders::_1, // processor d_clusterData_p->identity().description(), partitionId, uri, @@ -3927,7 +3924,6 @@ void StorageManager::setQueueRaw(mqbi::Queue* queue, StorageUtil::setQueueDispatched(&d_storages[partitionId], &d_storagesLock, - processorForPartition(partitionId), d_clusterData_p->identity().description(), partitionId, uri, diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index ad639baf50..3594ac09d6 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -198,12 +198,11 @@ bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, } void StorageUtil::registerQueueDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs) + mqbs::FileStore* fs, + mqbs::ReplicatedStorage* storage, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -258,14 +257,13 @@ void StorageUtil::registerQueueDispatched( } void StorageUtil::updateQueuePrimaryDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - bool isFanout) + mqbs::ReplicatedStorage* storage, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + bool isFanout) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2300,20 +2298,18 @@ StorageUtil::generateAppKey(bsl::unordered_set* appKeys, return appKey; } -void StorageUtil::registerQueue( - const mqbi::Cluster* cluster, - mqbi::Dispatcher* dispatcher, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - bmqma::CountingAllocatorStore* allocators, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - mqbi::Domain* domain) +void StorageUtil::registerQueue(const mqbi::Cluster* cluster, + mqbi::Dispatcher* dispatcher, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + bmqma::CountingAllocatorStore* allocators, + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -2331,6 +2327,8 @@ void StorageUtil::registerQueue( cluster->clusterConfig()->partitionConfig().numPartitions()); BSLS_ASSERT_SAFE(domain); + const int processor = fs->processorId(); + // StorageMgr is either aware of the queue (the 'uri') or it isn't. If it // is already aware, either this queue was registered earlier or it was // seen during recovery (if it's a file-backed storage), which means that @@ -2425,7 +2423,6 @@ void StorageUtil::registerQueue( .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback(bdlf::BindUtil::bind( updateQueuePrimaryDispatched, - bdlf::PlaceHolders::_1, // processor storageSp.get(), storagesLock, fs, @@ -2565,7 +2562,6 @@ void StorageUtil::registerQueue( (*queueEvent) .setType(mqbi::DispatcherEventType::e_DISPATCHER) .setCallback(bdlf::BindUtil::bind(®isterQueueDispatched, - bdlf::PlaceHolders::_1, // processor fs, storageSp.get(), clusterDescription, @@ -2575,15 +2571,13 @@ void StorageUtil::registerQueue( fs->dispatchEvent(queueEvent); } -void StorageUtil::unregisterQueueDispatched( - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const ClusterData* clusterData, - int partitionId, - const PartitionInfo& pinfo, - const bmqt::Uri& uri) +void StorageUtil::unregisterQueueDispatched(mqbs::FileStore* fs, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const ClusterData* clusterData, + int partitionId, + const PartitionInfo& pinfo, + const bmqt::Uri& uri) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2947,7 +2941,7 @@ void StorageUtil::unregisterQueueReplicaDispatched( const mqbu::StorageKey& appKey, bool isCSLMode) { - // executed by *QUEUE_DISPATCHER* thread with the specified 'processorId' + // executed by *QUEUE_DISPATCHER* thread associated with `partitionId` // PRECONDITIONS BSLS_ASSERT_SAFE(fs); @@ -3173,14 +3167,12 @@ void StorageUtil::updateQueueReplicaDispatched( } } -void StorageUtil::setQueueDispatched( - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, - const bsl::string& clusterDescription, - int partitionId, - const bmqt::Uri& uri, - mqbi::Queue* queue) +void StorageUtil::setQueueDispatched(StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const bsl::string& clusterDescription, + int partitionId, + const bmqt::Uri& uri, + mqbi::Queue* queue) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 8ba3d09fe4..20465ab4f9 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -197,24 +197,21 @@ struct StorageUtil { const AppInfos& newAppInfos); /// THREAD: Executed by the Queue's dispatcher thread. - static void - registerQueueDispatched(const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs); + static void registerQueueDispatched(mqbs::FileStore* fs, + mqbs::ReplicatedStorage* storage, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs); /// THREAD: This method is called from the Queue's dispatcher thread. - static void updateQueuePrimaryDispatched( - const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - bool isFanout); + static void + updateQueuePrimaryDispatched(mqbs::ReplicatedStorage* storage, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + bool isFanout); /// StorageManager's storages lock must be locked before calling this /// method. @@ -626,31 +623,27 @@ struct StorageUtil { /// associated queue storage created. /// /// THREAD: Executed by the Client's dispatcher thread. - static void - registerQueue(const mqbi::Cluster* cluster, - mqbi::Dispatcher* dispatcher, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - bmqma::CountingAllocatorStore* allocators, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - const bsl::string& clusterDescription, - int partitionId, - const AppInfos& appIdKeyPairs, - mqbi::Domain* domain); + static void registerQueue(const mqbi::Cluster* cluster, + mqbi::Dispatcher* dispatcher, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + bmqma::CountingAllocatorStore* allocators, + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs, + mqbi::Domain* domain); /// THREAD: Executed by the Queue's dispatcher thread. - static void unregisterQueueDispatched( - const mqbi::Dispatcher::ProcessorHandle& processor, - mqbs::FileStore* fs, - StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const ClusterData* clusterData, - int partitionId, - const PartitionInfo& pinfo, - const bmqt::Uri& uri); + static void unregisterQueueDispatched(mqbs::FileStore* fs, + StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const ClusterData* clusterData, + int partitionId, + const PartitionInfo& pinfo, + const bmqt::Uri& uri); /// Configure the fanout queue having specified `uri` and `queueKey`, /// assigned to the specified `partitionId` to have the specified @@ -712,14 +705,12 @@ struct StorageUtil { /// Executed by queue-dispatcher thread with the specified /// `processorId`. - static void - setQueueDispatched(StorageSpMap* storageMap, - bslmt::Mutex* storagesLock, - const mqbi::Dispatcher::ProcessorHandle& processor, - const bsl::string& clusterDescription, - int partitionId, - const bmqt::Uri& uri, - mqbi::Queue* queue); + static void setQueueDispatched(StorageSpMap* storageMap, + bslmt::Mutex* storagesLock, + const bsl::string& clusterDescription, + int partitionId, + const bmqt::Uri& uri, + mqbi::Queue* queue); static int makeStorage(bsl::ostream& errorDescription, bsl::shared_ptr* out, diff --git a/src/groups/mqb/mqbi/mqbi_dispatcher.cpp b/src/groups/mqb/mqbi/mqbi_dispatcher.cpp index 3f8ba170d8..66340c5c79 100644 --- a/src/groups/mqb/mqbi/mqbi_dispatcher.cpp +++ b/src/groups/mqb/mqbi/mqbi_dispatcher.cpp @@ -168,13 +168,6 @@ bool DispatcherEventType::fromAscii(DispatcherEventType::Enum* out, // class Dispatcher // ---------------- -// CLASS METHODS -Dispatcher::ProcessorFunctor -Dispatcher::voidToProcessorFunctor(const Dispatcher::VoidFunctor& functor) -{ - return bdlf::BindUtil::bind(functor); -} - // CREATORS Dispatcher::~Dispatcher() { @@ -327,7 +320,7 @@ bsl::ostream& DispatcherEvent::print(bsl::ostream& stream, } break; case DispatcherEventType::e_DISPATCHER: { printer.printAttribute("hasFinalizeCallback", - (finalizeCallback() ? "yes" : "no")); + (finalizeCallback().empty() ? "no" : "yes")); } break; case DispatcherEventType::e_CALLBACK: { // Nothing more to print diff --git a/src/groups/mqb/mqbi/mqbi_dispatcher.h b/src/groups/mqb/mqbi/mqbi_dispatcher.h index f09ea521c1..336d70a616 100644 --- a/src/groups/mqb/mqbi/mqbi_dispatcher.h +++ b/src/groups/mqb/mqbi/mqbi_dispatcher.h @@ -116,6 +116,7 @@ #include #include +#include // BDE #include @@ -326,23 +327,13 @@ class Dispatcher { typedef int ProcessorHandle; /// Signature of a `void` functor method. - typedef bsl::function VoidFunctor; - - /// Signature of a functor method with one parameter, the processor - /// handle on which it is being executed. - typedef bsl::function ProcessorFunctor; + typedef bmqu::ManagedCallback::VoidFunctor VoidFunctor; // PUBLIC CLASS DATA /// Value of an invalid processor handle. static const ProcessorHandle k_INVALID_PROCESSOR_HANDLE = -1; - // CLASS METHODS - - /// Convenient utility to convert the specified `functor` from a - /// `VoidFunctor` into a `ProcessorFunctor` type. - static ProcessorFunctor voidToProcessorFunctor(const VoidFunctor& functor); - public: // CREATORS @@ -425,7 +416,7 @@ class Dispatcher { /// clients of the specified `type`, and invoke the specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - virtual void execute(const ProcessorFunctor& functor, + virtual void execute(const VoidFunctor& functor, DispatcherClientType::Enum type, const VoidFunctor& doneCallback = VoidFunctor()) = 0; @@ -496,11 +487,11 @@ class DispatcherDispatcherEvent { /// Return a reference not offering modifiable access to the callback /// associated to this event. - virtual const Dispatcher::ProcessorFunctor& callback() const = 0; + virtual const bmqu::ManagedCallback& callback() const = 0; /// Return a reference not offering modifiable access to the finalize /// callback, if any, associated to this event. - virtual const Dispatcher::VoidFunctor& finalizeCallback() const = 0; + virtual const bmqu::ManagedCallback& finalizeCallback() const = 0; }; // ============================= @@ -520,7 +511,7 @@ class DispatcherCallbackEvent { /// Return a reference not offering modifiable access to the callback /// associated to this event. - virtual const Dispatcher::ProcessorFunctor& callback() const = 0; + virtual const bmqu::ManagedCallback& callback() const = 0; }; // =================================== @@ -934,9 +925,6 @@ class DispatcherEvent : public DispatcherDispatcherEvent, // DispatcherEvent view interfaces // for more specific information. - Dispatcher::ProcessorFunctor d_callback; - // Callback embedded in this event. - mqbnet::ClusterNode* d_clusterNode_p; // 'ClusterNode' associated to this // event. @@ -950,15 +938,6 @@ class DispatcherEvent : public DispatcherDispatcherEvent, bmqp_ctrlmsg::ControlMessage d_controlMessage; // ControlMessage in this event.. - Dispatcher::VoidFunctor d_finalizeCallback; - // Callback embedded in this event. - // This callback is called when the - // 'Dispatcher::execute' method is - // used to enqueue an event to - // multiple processors, and will be - // called when the last processor - // finished processing it. - bmqt::MessageGUID d_guid; // GUID of the message in this event. @@ -1005,6 +984,15 @@ class DispatcherEvent : public DispatcherDispatcherEvent, bsl::shared_ptr d_state; + /// In-place storage for the callback in this event. + bmqu::ManagedCallback d_callback; + + /// Callback embedded in this event. This callback is called when the + /// 'Dispatcher::execute' method is used to enqueue an event to multiple + /// processors, and will be called when the last processor finished + /// processing it. + bmqu::ManagedCallback d_finalizeCallback; + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(DispatcherEvent, bslma::UsesBslmaAllocator) @@ -1031,14 +1019,14 @@ class DispatcherEvent : public DispatcherDispatcherEvent, const bmqp::AckMessage& ackMessage() const BSLS_KEYWORD_OVERRIDE; const bsl::shared_ptr& blob() const BSLS_KEYWORD_OVERRIDE; const bsl::shared_ptr& options() const BSLS_KEYWORD_OVERRIDE; - const Dispatcher::ProcessorFunctor& callback() const BSLS_KEYWORD_OVERRIDE; + const bmqu::ManagedCallback& callback() const BSLS_KEYWORD_OVERRIDE; + const bmqu::ManagedCallback& + finalizeCallback() const BSLS_KEYWORD_OVERRIDE; mqbnet::ClusterNode* clusterNode() const BSLS_KEYWORD_OVERRIDE; const bmqp::ConfirmMessage& confirmMessage() const BSLS_KEYWORD_OVERRIDE; const bmqp::RejectMessage& rejectMessage() const BSLS_KEYWORD_OVERRIDE; const bmqp_ctrlmsg::ControlMessage& - controlMessage() const BSLS_KEYWORD_OVERRIDE; - const Dispatcher::VoidFunctor& - finalizeCallback() const BSLS_KEYWORD_OVERRIDE; + controlMessage() const BSLS_KEYWORD_OVERRIDE; const bmqt::MessageGUID& guid() const BSLS_KEYWORD_OVERRIDE; bool isRelay() const BSLS_KEYWORD_OVERRIDE; int partitionId() const BSLS_KEYWORD_OVERRIDE; @@ -1062,13 +1050,20 @@ class DispatcherEvent : public DispatcherDispatcherEvent, public: // MANIPULATORS + bmqu::ManagedCallback& callback(); + bmqu::ManagedCallback& finalizeCallback(); + + DispatcherEvent& + setCallback(bslmf::MovableRef value); + DispatcherEvent& + setFinalizeCallback(bslmf::MovableRef value); + DispatcherEvent& setType(DispatcherEventType::Enum value); DispatcherEvent& setSource(DispatcherClient* value); DispatcherEvent& setDestination(DispatcherClient* value); DispatcherEvent& setAckMessage(const bmqp::AckMessage& value); DispatcherEvent& setBlob(const bsl::shared_ptr& value); DispatcherEvent& setOptions(const bsl::shared_ptr& value); - DispatcherEvent& setCallback(const Dispatcher::ProcessorFunctor& value); DispatcherEvent& setClusterNode(mqbnet::ClusterNode* value); DispatcherEvent& setConfirmMessage(const bmqp::ConfirmMessage& value); DispatcherEvent& setRejectMessage(const bmqp::RejectMessage& value); @@ -1292,12 +1287,10 @@ inline DispatcherEvent::DispatcherEvent(bslma::Allocator* allocator) , d_ackMessage() , d_blob_sp(0, allocator) , d_options_sp(0, allocator) -, d_callback(bsl::allocator_arg, allocator) , d_clusterNode_p(0) , d_confirmMessage() , d_rejectMessage() , d_controlMessage(allocator) -, d_finalizeCallback(bsl::allocator_arg, allocator) , d_guid(bmqt::MessageGUID()) , d_isRelay(false) , d_partitionId(-1) @@ -1310,6 +1303,8 @@ inline DispatcherEvent::DispatcherEvent(bslma::Allocator* allocator) , d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE) , d_isOutOfOrder(false) , d_genCount(0) +, d_callback(allocator) +, d_finalizeCallback(allocator) { // NOTHING } @@ -1329,11 +1324,26 @@ inline const bsl::shared_ptr& DispatcherEvent::options() const return d_options_sp; } -inline const Dispatcher::ProcessorFunctor& DispatcherEvent::callback() const +inline const bmqu::ManagedCallback& DispatcherEvent::callback() const { return d_callback; } +inline bmqu::ManagedCallback& DispatcherEvent::callback() +{ + return d_callback; +} + +inline const bmqu::ManagedCallback& DispatcherEvent::finalizeCallback() const +{ + return d_finalizeCallback; +} + +inline bmqu::ManagedCallback& DispatcherEvent::finalizeCallback() +{ + return d_finalizeCallback; +} + inline mqbnet::ClusterNode* DispatcherEvent::clusterNode() const { return d_clusterNode_p; @@ -1355,11 +1365,6 @@ DispatcherEvent::controlMessage() const return d_controlMessage; } -inline const Dispatcher::VoidFunctor& DispatcherEvent::finalizeCallback() const -{ - return d_finalizeCallback; -} - inline const bmqt::MessageGUID& DispatcherEvent::guid() const { return d_guid; @@ -1470,9 +1475,16 @@ DispatcherEvent::setOptions(const bsl::shared_ptr& value) } inline DispatcherEvent& -DispatcherEvent::setCallback(const Dispatcher::ProcessorFunctor& value) +DispatcherEvent::setCallback(bslmf::MovableRef value) { - d_callback = value; + d_callback.set(value); + return *this; +} + +inline DispatcherEvent& DispatcherEvent::setFinalizeCallback( + bslmf::MovableRef value) +{ + d_finalizeCallback.set(value); return *this; } @@ -1504,13 +1516,6 @@ DispatcherEvent::setControlMessage(const bmqp_ctrlmsg::ControlMessage& value) return *this; } -inline DispatcherEvent& -DispatcherEvent::setFinalizeCallback(const Dispatcher::VoidFunctor& value) -{ - d_finalizeCallback = value; - return *this; -} - inline DispatcherEvent& DispatcherEvent::setGuid(const bmqt::MessageGUID& value) { @@ -1605,11 +1610,11 @@ inline void DispatcherEvent::reset() d_ackMessage = bmqp::AckMessage(); d_blob_sp.reset(); d_options_sp.reset(); - d_callback = bsl::nullptr_t(); + d_callback.reset(); + d_finalizeCallback.reset(); d_clusterNode_p = 0; d_confirmMessage = bmqp::ConfirmMessage(); d_rejectMessage = bmqp::RejectMessage(); - d_finalizeCallback = bsl::nullptr_t(); d_guid = bmqt::MessageGUID(); d_isRelay = false; d_putHeader = bmqp::PutHeader(); diff --git a/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp b/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp index 4738d11351..6971f0087e 100644 --- a/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_dispatcher.cpp @@ -108,13 +108,12 @@ void Dispatcher::execute( } void Dispatcher::execute( - const mqbi::Dispatcher::ProcessorFunctor& functor, + const mqbi::Dispatcher::VoidFunctor& functor, BSLS_ANNOTATION_UNUSED mqbi::DispatcherClientType::Enum type, const mqbi::Dispatcher::VoidFunctor& doneCallback) { if (functor) { - const ProcessorHandle dummy = Dispatcher::k_INVALID_PROCESSOR_HANDLE; - functor(dummy); + functor(); } if (doneCallback) { @@ -243,7 +242,8 @@ mqbi::DispatcherClientData& DispatcherClient::dispatcherClientData() void DispatcherClient::onDispatcherEvent(const mqbi::DispatcherEvent& event) { if (event.type() == mqbi::DispatcherEventType::e_CALLBACK) { - event.asCallbackEvent()->callback()(0); + BSLS_ASSERT_SAFE(!event.asCallbackEvent()->callback().empty()); + event.asCallbackEvent()->callback()(); } } diff --git a/src/groups/mqb/mqbmock/mqbmock_dispatcher.h b/src/groups/mqb/mqbmock/mqbmock_dispatcher.h index da208f680a..e4aa0dc62b 100644 --- a/src/groups/mqb/mqbmock/mqbmock_dispatcher.h +++ b/src/groups/mqb/mqbmock/mqbmock_dispatcher.h @@ -176,9 +176,9 @@ class Dispatcher : public mqbi::Dispatcher { /// clients of the specified `type`, and invoke the specified /// `doneCallback` (if any) when all the relevant processors are done /// executing the `functor`. - void execute(const mqbi::Dispatcher::ProcessorFunctor& functor, - mqbi::DispatcherClientType::Enum type, - const mqbi::Dispatcher::VoidFunctor& doneCallback) + void execute(const mqbi::Dispatcher::VoidFunctor& functor, + mqbi::DispatcherClientType::Enum type, + const mqbi::Dispatcher::VoidFunctor& doneCallback) BSLS_KEYWORD_OVERRIDE; void synchronize(mqbi::DispatcherClient* client) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index fe429263bd..c47ec89a0e 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -7240,8 +7240,8 @@ void FileStore::onDispatcherEvent(const mqbi::DispatcherEvent& event) case mqbi::DispatcherEventType::e_CALLBACK: { const mqbi::DispatcherCallbackEvent* realEvent = event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + BSLS_ASSERT_SAFE(!realEvent->callback().empty()); + realEvent->callback()(); } break; // BREAK case mqbi::DispatcherEventType::e_UNDEFINED: case mqbi::DispatcherEventType::e_PUT: