Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run the capture thread with real-time priority #116

Merged
merged 4 commits into from
Jun 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions Linux-Application/DomesdayDuplicator/usbcapture.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

#include "usbcapture.h"

#include <atomic>
#include <sched.h>
#include <sys/mman.h>

// Notes on transfer and disk buffering:
//
// TRANSFERSIZE: Each in-flight transfer returns 16 Kbytes * 16 (256 Kbytes)
Expand All @@ -53,32 +57,32 @@ struct transferUserDataStruct {
};

// Flag to indicate if disk buffer processing is running
static volatile bool isDiskBufferProcessRunning;
static std::atomic<bool> isDiskBufferProcessRunning;

// Flag passed to mainwindow to notify of closefile
static bool isOkToRename = false;

// Global to monitor the number of in-flight transfers
static volatile qint32 transfersInFlight = 0;
static std::atomic<qint32> transfersInFlight(0);

// Flag to cancel transfers in flight
static volatile bool transferAbort = false;
static std::atomic<bool> transferAbort(false);

// Flag to show capture complete
static volatile bool captureComplete = false;
static std::atomic<bool> captureComplete(false);

// Flag to show transfer failure
static volatile bool transferFailure = false;
static std::atomic<bool> transferFailure(false);

// Private variables used to report statistics about the transfer process
struct statisticsStruct {
qint32 transferCount; // Number of successful transfers
std::atomic<qint32> transferCount; // Number of successful transfers
};
static volatile statisticsStruct statistics;
static statisticsStruct statistics;

// Set up a pointer to the disk buffers and their full flags
static unsigned char **diskBuffers = nullptr;
static volatile bool isDiskBufferFull[NUMBEROFDISKBUFFERS];
static std::atomic<bool> isDiskBufferFull[NUMBEROFDISKBUFFERS];

// Set up a pointer to the conversion buffer
static unsigned char *conversionBuffer;
Expand All @@ -87,7 +91,7 @@ static unsigned char *conversionBuffer;
// before disk buffering starts. It seems to be necessary to discard
// the first set of in-flight transfers as the FX3 doesn't return
// valid data until second set.
static volatile qint32 flushCounter;
static std::atomic<qint32> flushCounter;

// Last error is a string used to communicate a failure reason to the GUI
static QString lastError;
Expand Down Expand Up @@ -301,6 +305,29 @@ void UsbCapture::run(void)
return;
}

// Save the current scheduling policy and parameters
int oldSchedPolicy = sched_getscheduler(0);
if (oldSchedPolicy == -1) oldSchedPolicy = SCHED_OTHER;
struct sched_param oldSchedParam;
if (sched_getparam(0, &oldSchedParam) == -1) oldSchedParam.sched_priority = 0;

// Enable real-time scheduling for this thread
int minSchedPriority = sched_get_priority_min(SCHED_RR);
int maxSchedPriority = sched_get_priority_max(SCHED_RR);
struct sched_param schedParams;
if (minSchedPriority == -1 || maxSchedPriority == -1) {
schedParams.sched_priority = 0;
} else {
// Put the priority about 3/4 of the way through its range
schedParams.sched_priority = (minSchedPriority + (3 * maxSchedPriority)) / 4;
}
if (sched_setscheduler(0, SCHED_RR, &schedParams) != -1) {
qDebug() << "UsbCapture::run(): Real-time scheduling enabled with priority" << schedParams.sched_priority;
} else {
// Continue anyway, but print a warning
qInfo() << "UsbCapture::run(): Unable to enable real-time scheduling for capture thread";
}

// Set up the initial transfers
for (qint32 transferNumber = 0; transferNumber < SIMULTANEOUSTRANSFERS; transferNumber++) {
usbTransfers[transferNumber] = libusb_alloc_transfer(0);
Expand Down Expand Up @@ -364,6 +391,11 @@ void UsbCapture::run(void)
libusb_handle_events_timeout(libUsbContext, &libusbHandleTimeout);
}

// Return to the original scheduling policy while we're cleaning up
if (sched_setscheduler(0, oldSchedPolicy, &oldSchedParam) == -1) {
qDebug() << "UsbCapture::run(): Unable to restore original scheduling policy";
}

// Deallocate transfers
qDebug() << "UsbCapture::run(): Transfer stopping - Freeing transfer buffers...";
for (qint32 transferNumber = 0; transferNumber < SIMULTANEOUSTRANSFERS; transferNumber++)
Expand All @@ -373,6 +405,7 @@ void UsbCapture::run(void)
qDebug() << "UsbCapture::run(): Transfer stopping - waiting for disk buffer processing to complete...";
while (isDiskBufferProcessRunning) {
// Just waiting here for now
sched_yield();
}

// If the transfer failed, emit a notification signal to the parent object
Expand All @@ -399,10 +432,11 @@ void UsbCapture::run(void)
// Note: Using vectors would be neater, but they are just too slow
void UsbCapture::allocateDiskBuffers(void)
{
qDebug() << "UsbCapture::allocateDiskBuffers(): Allocating memory for disk buffers";
qDebug() << "UsbCapture::allocateDiskBuffers(): Allocating" << (1ULL * TRANSFERSIZE * TRANSFERSPERDISKBUFFER * NUMBEROFDISKBUFFERS) / (1024 * 1024) << "MiB memory for disk buffers";
// Allocate the disk buffers
diskBuffers = static_cast<unsigned char **>(calloc(NUMBEROFDISKBUFFERS, sizeof(unsigned char *)));
if (diskBuffers != nullptr) {
bool tryMlock = true;
for (quint32 bufferNumber = 0; bufferNumber < NUMBEROFDISKBUFFERS; bufferNumber++) {

diskBuffers[bufferNumber] = static_cast<unsigned char *>(malloc(TRANSFERSIZE * TRANSFERSPERDISKBUFFER));
Expand All @@ -415,7 +449,15 @@ void UsbCapture::allocateDiskBuffers(void)
transferFailure = true;
break;
}

// Lock the buffer into memory, preventing it from being paged out
if (tryMlock && mlock(diskBuffers[bufferNumber], TRANSFERSIZE * TRANSFERSPERDISKBUFFER) == -1) {
// Continue anyway, but print a warning
qInfo() << "UsbCapture::allocateDiskBuffers(): Unable to lock disk buffer into memory";
tryMlock = false;
}
}
if (tryMlock) qDebug() << "UsbCapture::allocateDiskBuffers(): Locked disk buffers into memory";
} else {
// Memory allocation has failed
qDebug() << "UsbCapture::allocateDiskBuffers(): Disk buffer array allocation failed!";
Expand All @@ -439,6 +481,9 @@ void UsbCapture::freeDiskBuffers(void)
// Free up the allocated disk buffers
if (diskBuffers != nullptr) {
for (qint32 bufferNumber = 0; bufferNumber < NUMBEROFDISKBUFFERS; bufferNumber++) {
// Don't keep the buffer in RAM any more (silently ignoring failure)
(void) munlock(diskBuffers[bufferNumber], TRANSFERSIZE * TRANSFERSPERDISKBUFFER);

if (diskBuffers[bufferNumber] != nullptr) {
free(diskBuffers[bufferNumber]);
}
Expand Down