From aa45bbb4894d6f46545e301052e7a4e6c78c2f2f Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Wed, 14 May 2014 18:26:43 +0300 Subject: [PATCH] xmitter class This class is a heart of a per-CPU Tx framework. Except for a constructor it has two public methods: - xmit(buff): push the packet descriptor downstream either to the HW or into the per-CPU queue if there is a contention. - poll_until(cond): this is a main function of a worker thread that will consume packet descriptors from the per-CPU queue(s) and send them to the output iterator (which is responsible to ensure their successful sending to the HW channel). Signed-off-by: Vlad Zolotarov Signed-off-by: Pekka Enberg --- include/osv/percpu_xmit.hh | 281 +++++++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) diff --git a/include/osv/percpu_xmit.hh b/include/osv/percpu_xmit.hh index 02403c4b82..3fed36b08a 100644 --- a/include/osv/percpu_xmit.hh +++ b/include/osv/percpu_xmit.hh @@ -169,6 +169,287 @@ private: #endif } CACHELINE_ALIGNED; +/** + * @class xmitter + * + * This class is a heart of a per-CPU Tx framework. + * Except for a constructor it has two public methods: + * - xmit(buff): push the packet descriptor downstream either to the HW or into + * the per-CPU queue if there is a contention. + * + * - poll_until(cond): this is a main function of a worker thread that will + * consume packet descriptors from the per-CPU queue(s) and send them to + * the output iterator (which is responsible to ensure their successful + * sending to the HW channel). + */ +template +class xmitter { +public: + explicit xmitter(NetDevTxq* txq) : + _txq(txq),_check_empty_queues(false) { + for (auto c : sched::cpus) { + _cpuq.for_cpu(c)->reset(new cpu_queue_type); + } + } + + /** + * A main transmit function: will try to bypass the per-CPU queue if + * possible and will push the frame into that queue otherwise. + * + * It may only block if it needs to push the frame into the per-CPU queue + * and it's full. + * + * @param buff packet descriptor to send + * + * @return 0 in case of success, EINVAL if a packet is not well-formed. + */ + int xmit(mbuf* buff) { + + void* cooky = nullptr; + int rc = _txq->xmit_prep(buff, cooky); + + if (rc == EINVAL) { + m_freem(buff); + return rc; + } + + assert(cooky != nullptr); + + // + // If there are pending packets (in the per-CPU queues) or we've failed + // to take a RUNNING lock push the packet in the per-CPU queue. + // + // Otherwise means that a dispatcher is neither running nor is + // scheduled to run. In this case bypass per-CPU queues and transmit + // in-place. + // + if (has_pending() || !try_lock_running()) { + push_cpu(cooky); + return 0; + } + + // If we are here means we've aquired a RUNNING lock + rc = _txq->try_xmit_one_locked(cooky); + + // Alright!!! + if (!rc) { + if (_txq->kick_hw()) { + } + } + + unlock_running(); + + // + // We unlock_running() not from a dispatcher only if the dispatcher is + // not running and is waiting for either a new work or for this lock. + // + // We want to wake a dispatcher only if there is a new work for it since + // otherwise there is no point for it to wake up. + // + if (has_pending()) { + _txq->wake_worker(); + } + + if (rc /* == ENOBUFS */) { + // + // There hasn't been enough buffers on the HW ring to send the + // packet - push it into the per-CPU queue, dispatcher will handle + // it later. + // + push_cpu(cooky); + } + + return 0; + } + + template + void poll_until(StopPollingPred stop_pred, XmitIterator& xmit_it) { + // Create a collection of a per-CPU queues + std::list all_cpuqs; + + for (auto c : sched::cpus) { + all_cpuqs.push_back(_cpuq.for_cpu(c)->get()); + } + + // Push them all into the heap + _mg.create_heap(all_cpuqs); + + // + // Dispatcher holds the RUNNING lock all the time it doesn't sleep + // waiting for a new work. + // + lock_running(); + + // Start taking packets one-by-one and send them out + while (!stop_pred()) { + // + // Reset the PENDING state. + // + // The producer thread will first add a new element to the heap and + // only then set the PENDING state. + // + // We need to ensure that PENDING is cleared before _mg.pop() is + // performed (and possibly returns false - the heap is empty) + // because otherwise the producer may see the "old" value of the + // PENDING state and won't wake us up. + // + clear_pending(); + + // Check if there are elements in the heap + if (!_mg.pop(xmit_it)) { + // Wake all unwoken waiters before going to sleep + wake_waiters_all(); + + // We are going to sleep - release the HW channel + unlock_running(); + + sched::thread::wait_until([this] { return has_pending(); }); + + lock_running(); + } + + while (_mg.pop(xmit_it)) { + _txq->kick_pending_with_thresh(); + } + + // Kick any pending work + _txq->kick_pending(); + } + + // TODO: Add some handshake like a bool variable here + assert(0); + } + +private: + void wake_waiters_all() { + for (auto c : sched::cpus) { + _cpuq.for_cpu(c)->get()->wake_waiters(); + } + } + + /** + * Push the packet into the per-CPU queue for the current CPU. + * @param buf packet descriptor to push + */ + void push_cpu(void* cooky) { + bool success = false; + + sched::preempt_disable(); + + cpu_queue_type* local_cpuq = _cpuq->get(); + typename cpu_queue_type::value_type new_buff_desc = { get_ts(), cooky }; + + while (!local_cpuq->push(new_buff_desc)) { + wait_record wr(sched::thread::current()); + local_cpuq->push_new_waiter(&wr); + + // + // Try to push again in order to resolve a nasty race: + // + // If dispatcher has succeeded to empty the whole ring before we + // added our record to the waitq then without this push() we could + // have stuck until somebody else adds another packet to this + // specific cpuq. In this case adding a packet will ensure that + // dispatcher eventually handles it and "wake()s" us up. + // + // If we fail to add the packet here then this means that the queue + // has still been full AFTER we added the wait_record and we need to + // wait until dispatcher cleans it up and wakes us. + // + // All this is because we can't exit this function until dispatcher + // pop()s our wait_record since it's allocated on our stack. + // + success = local_cpuq->push(new_buff_desc); + if (success && !test_and_set_pending()) { + _txq->wake_worker(); + } + + sched::preempt_enable(); + + wr.wait(); + + // we are done - get out! + if (success) { + return; + } + + sched::preempt_disable(); + + // Refresh: we could have been moved to a different CPU + local_cpuq = _cpuq->get(); + // + // Refresh: another thread could have pushed its packet before us + // and i t had an earlier timestamp - we have to keep the + // timestampes ordered in the CPU queue. + // + new_buff_desc.ts = get_ts(); + } + + // + // Try to save the IPI sending (when dispatcher sleeps for an interrupt) + // and exchange in the wake_impl() by paying a price of an exchange + // operation here. + // + if (!test_and_set_pending()) { + _txq->wake_worker(); + } + + sched::preempt_enable(); + } + + /** + * @return the current timestamp + */ + clock::uptime::time_point get_ts() { + return clock::uptime::now(); + + } + + // RUNNING state controling functions + bool try_lock_running() { + return !_running.test_and_set(std::memory_order_acquire); + } + + void lock_running() { + // + // Check if there is no fast-transmit hook running already. + // If yes - sleep until it ends. + // + if (!try_lock_running()) { + sched::thread::wait_until([this] { return try_lock_running(); }); + } + } + void unlock_running() { + _running.clear(std::memory_order_release); + } + + // PENDING (packets) controling functions + bool has_pending() const { + return _check_empty_queues.load(std::memory_order_acquire); + } + + bool test_and_set_pending() { + return _check_empty_queues.exchange(true, std::memory_order_acq_rel); + } + void clear_pending() { + _check_empty_queues.store(false, std::memory_order_release); + } + +private: + typedef cpu_queue cpu_queue_type; + + NetDevTxq* _txq; // Rename to _dev_txq + dynamic_percpu > _cpuq; + osv::nway_merger > _mg CACHELINE_ALIGNED; + std::atomic _check_empty_queues CACHELINE_ALIGNED; + // + // This lock will be used to get an exclusive control over the HW + // channel. + // + std::atomic_flag _running CACHELINE_ALIGNED + = ATOMIC_FLAG_INIT; +}; + } // namespace osv #endif // PERCPU_XMIT_HH_