-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Squashed 'externals/coda-oss/' changes from 52b45ab..96cb50c
96cb50c Merge pull request #236 from mdaus/fix_work_sharing_runnable a5505c6 Missed incrementing threadNum ffb0da0 Fixed bug that occurred when the ThreadPlanner chose fewer threads than were provided 97aa2a6 The WithCopies() version needs to call the version that takes in a vector, not itself d64d9d1 Merge pull request #235 from mdaus/range-num-shared-elements 7005250 add unittest for types::Range 057a359 add missing algorithms header 141e4d8 add getNumSharedElements method to types::Range 3da4060 Merge pull request #234 from mdaus/updateSwig 96d0968 Update SWIG-generated code to 3.0.12 935fb41 Merge pull request #233 from mdaus/outputByteSwap 3664e35 Review fixes 8aa5dfd Add byteswap overload if you don't want to do it in-place b760b72 Merge pull request #232 from mdaus/range-empty-method 48b779d Range type empty method ec2df5d Merge pull request #231 from mdaus/mt-balanced-runnables ee6e397 remove fill call ee025cb remove algorithm header 9f6e237 update balanced runnable unit tests according to pr comments 3ec8e60 update runnable classes/interfaces according to pr comments 41a75ce repeated word bd564ae doxygen 0349424 make sure single threaded runnables work correctly 0a63e3f SharedWork -> WorkSharing cc2bbb9 clean up cbd11c9 unit tests a35739b add work sharing balanced runnable and interfaces 1067d0f Add balanced runnable and interfaces to mt 10e9a58 add range type ca55fe2 Merge pull request #230 from mdaus/add_const_overloadings ab83181 Adding const overloadings fb46a95 Merge pull request #229 from mdaus/exceptionFix 8087809 Fix exception message for unrecognizable data type git-subtree-dir: externals/coda-oss git-subtree-split: 96cb50c2bf8e1967a1f907912c496eaef73f677f
- Loading branch information
1 parent
0ada785
commit 27aad72
Showing
38 changed files
with
2,141 additions
and
972 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
/* ========================================================================= | ||
* This file is part of mt-c++ | ||
* ========================================================================= | ||
* | ||
* (C) Copyright 2004 - 2017, MDA Information Systems LLC | ||
* | ||
* mt-c++ is free software; you can redistribute it and/or modify | ||
* it under the terms of the GNU Lesser General Public License as published by | ||
* the Free Software Foundation; either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Lesser General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Lesser General Public | ||
* License along with this program; If not, | ||
* see <http://www.gnu.org/licenses/>. | ||
* | ||
*/ | ||
#ifndef __MT_BALANCED_RUNNABLE_1D_H__ | ||
#define __MT_BALANCED_RUNNABLE_1D_H__ | ||
|
||
#include <vector> | ||
#include <sstream> | ||
|
||
#include <sys/Conf.h> | ||
#include <sys/Runnable.h> | ||
#include <except/Exception.h> | ||
#include <mt/ThreadPlanner.h> | ||
#include <mt/ThreadGroup.h> | ||
|
||
namespace mt | ||
{ | ||
/*! | ||
* \class BalancedRunnable1D | ||
* \tparam OpT The type of functor that will be used to process elements | ||
* | ||
* Given a reference to an atomic counter, this runnable will | ||
* atomically get and then increment an element, passing the fetched element | ||
* to the provided functor for processing. Each runnable will operate over | ||
* the full range of elements. | ||
* | ||
* This runnable is useful in cases where work needs to be | ||
* done across a range of elements, but when dividing these elements | ||
* across threads leads to balancing issues i.e certain threads | ||
* terminate earlier than other threads. | ||
* | ||
*/ | ||
template <typename OpT> | ||
class BalancedRunnable1D : public sys::Runnable | ||
{ | ||
public: | ||
|
||
/*! | ||
* Constructor | ||
* | ||
* \param[in,out] atomicCounter Atomic counter all threads will use to | ||
* fetch elements to process | ||
* | ||
* \param numElements The global number of elements to process | ||
* | ||
* \param op Functor to use | ||
* | ||
*/ | ||
BalancedRunnable1D(size_t numElements, | ||
sys::AtomicCounter& atomicCounter, | ||
const OpT& op) : | ||
mNumElements(numElements), | ||
mCounter(atomicCounter), | ||
mOp(op) | ||
{ | ||
} | ||
|
||
virtual void run() | ||
{ | ||
while (true) | ||
{ | ||
const size_t element = mCounter.getThenIncrement(); | ||
if (element < mNumElements) | ||
{ | ||
mOp(element); | ||
} | ||
else | ||
{ | ||
break; | ||
} | ||
} | ||
} | ||
|
||
private: | ||
const size_t mNumElements; | ||
sys::AtomicCounter& mCounter; | ||
const OpT& mOp; | ||
}; | ||
|
||
/*! | ||
* This method creates an atomic counter that will be shared across threads | ||
* and used to fetch elements within a global range. Each thread will | ||
* process fetched elements using the provided functor until all elements | ||
* have been processed. | ||
* | ||
* Rather than divide the range of elements across threads, each thread | ||
* will remain active until every element within the global range has been | ||
* processed. This behavior prevents balancing issues from occurring by | ||
* avoiding cases where certain threads terminate earlier than others due | ||
* to having a range of elements that is less expensive to process. Instead, | ||
* all threads will participate equally in grabbing any available work | ||
* across the global range. | ||
* | ||
* \tparam OpT The type of functor that will be used to process elements | ||
* | ||
* \param numElements Number of elements of work | ||
* \param numThreads Number of threads | ||
* \param op Functor to use | ||
*/ | ||
template <typename OpT> | ||
void runBalanced1D(size_t numElements, | ||
size_t numThreads, | ||
const OpT& op) | ||
{ | ||
sys::AtomicCounter counter(0); | ||
if (numThreads <= 1) | ||
{ | ||
BalancedRunnable1D<OpT>(numElements, counter, op).run(); | ||
} | ||
else | ||
{ | ||
ThreadGroup threads; | ||
for (size_t ii = 0; ii < numThreads; ++ii) | ||
{ | ||
threads.createThread(new BalancedRunnable1D<OpT>( | ||
numElements, counter, op)); | ||
} | ||
threads.joinAll(); | ||
} | ||
} | ||
|
||
/*! | ||
* Same as above, but instead of sharing a functor across runnables, | ||
* each runnable will receive its own. | ||
* | ||
* \tparam OpT The type of functor that will be used to process elements | ||
* | ||
* \param numElements Number of elements of work | ||
* \param numThreads Number of threads | ||
* \param ops Vector of functors to use | ||
*/ | ||
template <typename OpT> | ||
void runBalanced1D(size_t numElements, | ||
size_t numThreads, | ||
const std::vector<OpT>& ops) | ||
{ | ||
sys::AtomicCounter counter(0); | ||
if (ops.size() != numThreads) | ||
{ | ||
std::ostringstream ostr; | ||
ostr << "Got " << numThreads << " threads but " << ops.size() | ||
<< " functors"; | ||
throw except::Exception(Ctxt(ostr.str())); | ||
} | ||
|
||
if (numThreads <= 1) | ||
{ | ||
BalancedRunnable1D<OpT>(numElements, counter, ops[0]).run(); | ||
} | ||
else | ||
{ | ||
ThreadGroup threads; | ||
for (size_t ii = 0; ii < numThreads; ++ii) | ||
{ | ||
threads.createThread(new BalancedRunnable1D<OpT>( | ||
numElements, counter, ops[ii])); | ||
} | ||
|
||
threads.joinAll(); | ||
} | ||
} | ||
|
||
/*! | ||
* Convenience wrapper for providing each runnable with a copy of op. | ||
* This is useful in cases where each runnable should use a | ||
* functor with its own local storage. | ||
* | ||
* \tparam OpT The type of functor that will be used to process elements | ||
* | ||
* \param numElements Number of elements of work | ||
* \param numThreads Number of threads | ||
* \param op Functor to use | ||
*/ | ||
template <typename OpT> | ||
void runBalanced1DWithCopies(size_t numElements, | ||
size_t numThreads, | ||
const OpT& op) | ||
{ | ||
const std::vector<OpT> ops(numThreads, op); | ||
runBalanced1D(numElements, numThreads, ops); | ||
} | ||
} | ||
|
||
#endif |
Oops, something went wrong.