Skip to content

Commit

Permalink
xmitter class
Browse files Browse the repository at this point in the history
  This class is a heart of a per-CPU Tx framework.
  Except for a constructor it has two public methods:
   - xmit(buff): push the packet descriptor downstream either to the HW or into
     the per-CPU queue if there is a contention.

   - poll_until(cond): this is a main function of a worker thread that will
     consume packet descriptors from the per-CPU queue(s) and send them to
     the output iterator (which is responsible to ensure their successful
     sending to the HW channel).

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
  • Loading branch information
Vlad Zolotarov authored and Pekka Enberg committed May 15, 2014
1 parent 9741e82 commit aa45bbb
Showing 1 changed file with 281 additions and 0 deletions.
281 changes: 281 additions & 0 deletions include/osv/percpu_xmit.hh
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,287 @@ private:
#endif
} CACHELINE_ALIGNED;

/**
* @class xmitter
*
* This class is a heart of a per-CPU Tx framework.
* Except for a constructor it has two public methods:
* - xmit(buff): push the packet descriptor downstream either to the HW or into
* the per-CPU queue if there is a contention.
*
* - poll_until(cond): this is a main function of a worker thread that will
* consume packet descriptors from the per-CPU queue(s) and send them to
* the output iterator (which is responsible to ensure their successful
* sending to the HW channel).
*/
template <class NetDevTxq, unsigned CpuTxqSize>
class xmitter {
public:
explicit xmitter(NetDevTxq* txq) :
_txq(txq),_check_empty_queues(false) {
for (auto c : sched::cpus) {
_cpuq.for_cpu(c)->reset(new cpu_queue_type);
}
}

/**
* A main transmit function: will try to bypass the per-CPU queue if
* possible and will push the frame into that queue otherwise.
*
* It may only block if it needs to push the frame into the per-CPU queue
* and it's full.
*
* @param buff packet descriptor to send
*
* @return 0 in case of success, EINVAL if a packet is not well-formed.
*/
int xmit(mbuf* buff) {

void* cooky = nullptr;
int rc = _txq->xmit_prep(buff, cooky);

if (rc == EINVAL) {
m_freem(buff);
return rc;
}

assert(cooky != nullptr);

//
// If there are pending packets (in the per-CPU queues) or we've failed
// to take a RUNNING lock push the packet in the per-CPU queue.
//
// Otherwise means that a dispatcher is neither running nor is
// scheduled to run. In this case bypass per-CPU queues and transmit
// in-place.
//
if (has_pending() || !try_lock_running()) {
push_cpu(cooky);
return 0;
}

// If we are here means we've aquired a RUNNING lock
rc = _txq->try_xmit_one_locked(cooky);

// Alright!!!
if (!rc) {
if (_txq->kick_hw()) {
}
}

unlock_running();

//
// We unlock_running() not from a dispatcher only if the dispatcher is
// not running and is waiting for either a new work or for this lock.
//
// We want to wake a dispatcher only if there is a new work for it since
// otherwise there is no point for it to wake up.
//
if (has_pending()) {
_txq->wake_worker();
}

if (rc /* == ENOBUFS */) {
//
// There hasn't been enough buffers on the HW ring to send the
// packet - push it into the per-CPU queue, dispatcher will handle
// it later.
//
push_cpu(cooky);
}

return 0;
}

template <class StopPollingPred, class XmitIterator>
void poll_until(StopPollingPred stop_pred, XmitIterator& xmit_it) {
// Create a collection of a per-CPU queues
std::list<cpu_queue_type*> all_cpuqs;

for (auto c : sched::cpus) {
all_cpuqs.push_back(_cpuq.for_cpu(c)->get());
}

// Push them all into the heap
_mg.create_heap(all_cpuqs);

//
// Dispatcher holds the RUNNING lock all the time it doesn't sleep
// waiting for a new work.
//
lock_running();

// Start taking packets one-by-one and send them out
while (!stop_pred()) {
//
// Reset the PENDING state.
//
// The producer thread will first add a new element to the heap and
// only then set the PENDING state.
//
// We need to ensure that PENDING is cleared before _mg.pop() is
// performed (and possibly returns false - the heap is empty)
// because otherwise the producer may see the "old" value of the
// PENDING state and won't wake us up.
//
clear_pending();

// Check if there are elements in the heap
if (!_mg.pop(xmit_it)) {
// Wake all unwoken waiters before going to sleep
wake_waiters_all();

// We are going to sleep - release the HW channel
unlock_running();

sched::thread::wait_until([this] { return has_pending(); });

lock_running();
}

while (_mg.pop(xmit_it)) {
_txq->kick_pending_with_thresh();
}

// Kick any pending work
_txq->kick_pending();
}

// TODO: Add some handshake like a bool variable here
assert(0);
}

private:
void wake_waiters_all() {
for (auto c : sched::cpus) {
_cpuq.for_cpu(c)->get()->wake_waiters();
}
}

/**
* Push the packet into the per-CPU queue for the current CPU.
* @param buf packet descriptor to push
*/
void push_cpu(void* cooky) {
bool success = false;

sched::preempt_disable();

cpu_queue_type* local_cpuq = _cpuq->get();
typename cpu_queue_type::value_type new_buff_desc = { get_ts(), cooky };

while (!local_cpuq->push(new_buff_desc)) {
wait_record wr(sched::thread::current());
local_cpuq->push_new_waiter(&wr);

//
// Try to push again in order to resolve a nasty race:
//
// If dispatcher has succeeded to empty the whole ring before we
// added our record to the waitq then without this push() we could
// have stuck until somebody else adds another packet to this
// specific cpuq. In this case adding a packet will ensure that
// dispatcher eventually handles it and "wake()s" us up.
//
// If we fail to add the packet here then this means that the queue
// has still been full AFTER we added the wait_record and we need to
// wait until dispatcher cleans it up and wakes us.
//
// All this is because we can't exit this function until dispatcher
// pop()s our wait_record since it's allocated on our stack.
//
success = local_cpuq->push(new_buff_desc);
if (success && !test_and_set_pending()) {
_txq->wake_worker();
}

sched::preempt_enable();

wr.wait();

// we are done - get out!
if (success) {
return;
}

sched::preempt_disable();

// Refresh: we could have been moved to a different CPU
local_cpuq = _cpuq->get();
//
// Refresh: another thread could have pushed its packet before us
// and i t had an earlier timestamp - we have to keep the
// timestampes ordered in the CPU queue.
//
new_buff_desc.ts = get_ts();
}

//
// Try to save the IPI sending (when dispatcher sleeps for an interrupt)
// and exchange in the wake_impl() by paying a price of an exchange
// operation here.
//
if (!test_and_set_pending()) {
_txq->wake_worker();
}

sched::preempt_enable();
}

/**
* @return the current timestamp
*/
clock::uptime::time_point get_ts() {
return clock::uptime::now();

}

// RUNNING state controling functions
bool try_lock_running() {
return !_running.test_and_set(std::memory_order_acquire);
}

void lock_running() {
//
// Check if there is no fast-transmit hook running already.
// If yes - sleep until it ends.
//
if (!try_lock_running()) {
sched::thread::wait_until([this] { return try_lock_running(); });
}
}
void unlock_running() {
_running.clear(std::memory_order_release);
}

// PENDING (packets) controling functions
bool has_pending() const {
return _check_empty_queues.load(std::memory_order_acquire);
}

bool test_and_set_pending() {
return _check_empty_queues.exchange(true, std::memory_order_acq_rel);
}
void clear_pending() {
_check_empty_queues.store(false, std::memory_order_release);
}

private:
typedef cpu_queue<CpuTxqSize> cpu_queue_type;

NetDevTxq* _txq; // Rename to _dev_txq
dynamic_percpu<std::unique_ptr<cpu_queue_type> > _cpuq;
osv::nway_merger<std::list<cpu_queue_type*> > _mg CACHELINE_ALIGNED;
std::atomic<bool> _check_empty_queues CACHELINE_ALIGNED;
//
// This lock will be used to get an exclusive control over the HW
// channel.
//
std::atomic_flag _running CACHELINE_ALIGNED
= ATOMIC_FLAG_INIT;
};

} // namespace osv

#endif // PERCPU_XMIT_HH_

0 comments on commit aa45bbb

Please sign in to comment.