diff --git a/src/common/concurrent_blocking_queue.h b/src/common/concurrent_blocking_queue.h index aab39895b119..2713c8bc2a5c 100644 --- a/src/common/concurrent_blocking_queue.h +++ b/src/common/concurrent_blocking_queue.h @@ -31,7 +31,7 @@ template class ConcurrentBlockingQueue { queue_.pop_front(); if (queue_.empty()) has_elmt_ = false; - return false; + return true; } } { @@ -44,9 +44,9 @@ template class ConcurrentBlockingQueue { queue_.pop_front(); if (queue_.empty()) has_elmt_ = false; - return false; + return true; } else { - return true; + return false; } } } diff --git a/src/common/spin_lock.h b/src/common/spin_lock.h deleted file mode 100644 index 5a0cc3f786e6..000000000000 --- a/src/common/spin_lock.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef _SPINLOCK_XCHG_H -#define _SPINLOCK_XCHG_H - -/* Spin lock using xchg. - * Copied from http://locklessinc.com/articles/locks/ - */ - -/* Compile read-write barrier */ -#define barrier() asm volatile("": : :"memory") - -/* Pause instruction to prevent excess processor bus usage */ -#define cpu_relax() asm volatile("pause\n": : :"memory") - -static inline unsigned short xchg_8(void *ptr, unsigned char x) { - __asm__ __volatile__("xchgb %0,%1" - :"=r" (x) - :"m" (*(volatile unsigned char *)ptr), "0" (x) - :"memory"); - - return x; -} - -#define BUSY 1 -typedef unsigned char spinlock; - -#define SPINLOCK_INITIALIZER 0 - -static inline void spin_lock(spinlock *lock) { - while (1) { - if (!xchg_8(lock, BUSY)) return; - - while (*lock) cpu_relax(); - } -} - -static inline void spin_unlock(spinlock *lock) { - barrier(); - *lock = 0; -} - -static inline int spin_trylock(spinlock *lock) { - return xchg_8(lock, BUSY); -} - -#endif /* _SPINLOCK_XCHG_H */ diff --git a/src/common/spinlock.h b/src/common/spinlock.h new file mode 100644 index 000000000000..fb7285b5702e --- /dev/null +++ b/src/common/spinlock.h @@ -0,0 +1,44 @@ +#ifndef MXNET_COMMON_SPINLOCK_H_ +#define MXNET_COMMON_SPINLOCK_H_ + +#include + +namespace mxnet { +namespace common { + +/*! + * \brief Simple userspace spinlock implementation. + */ +class Spinlock { + public: + Spinlock() = default; + /*! + * \brief Disable copy and move. + */ + Spinlock(Spinlock const&) = delete; + Spinlock(Spinlock&&) = delete; + Spinlock& operator=(Spinlock const&) = delete; + Spinlock& operator=(Spinlock&&) = delete; + ~Spinlock() = default; + /*! + * \brief Acquire lock. + */ + void lock() noexcept { + while (lock_.test_and_set(std::memory_order_acquire)); + } + /*! + * \brief Release lock. + */ + void unlock() noexcept { + lock_.clear(std::memory_order_release); + } + + private: + std::atomic_flag lock_ = ATOMIC_FLAG_INIT; +}; + +} // namespace common +} // namespace mxnet + +#endif // MXNET_COMMON_SPINLOCK_H_ + diff --git a/src/dag_engine/threaded_engine.cc b/src/dag_engine/threaded_engine.cc index 143b5e72f413..32325ba4f1a8 100644 --- a/src/dag_engine/threaded_engine.cc +++ b/src/dag_engine/threaded_engine.cc @@ -159,7 +159,7 @@ class ThreadedEngine : public DAGEngine { } void WorkerRoutine(int thrid) { OpDescr* opd = nullptr; - while(! worker_queues_[thrid]->Pop(opd)) { + while(worker_queues_[thrid]->Pop(opd)) { LOG(INFO) << "worker thread #" << thrid << " got operator " << opd; opd->op(GetRunContext(opd->exec_ctx), [this, opd] () { this->OnOpFinished(opd); }); opd = nullptr;