Skip to content

Commit

Permalink
Callback dispatcher thread for the winuvc backend
Browse files Browse the repository at this point in the history
(cherry picked from commit a718d7b5024965f16564e6c730b758be973826a4)

# Conflicts:
#	src/win7/winusb_uvc/winusb_uvc.cpp
  • Loading branch information
dorodnic committed Aug 14, 2018
1 parent d626feb commit e4e6c34
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,8 @@ namespace librealsense
int size = 0;

public:
static const int CAPACITY = C;

small_heap()
{
for (auto i = 0; i < C; i++)
Expand Down
100 changes: 96 additions & 4 deletions src/win7/winusb_uvc/winusb_uvc.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
// winusb_uvc.cpp : Defines the entry point for the console application.
//
#define NOMINMAX
#include "windows.h"
#include "SETUPAPI.H"
#include "winusb_uvc.h"
#include "libuvc/utlist.h"
#include "types.h"

#include "concurrency.h"
#include "types.h"

#include <vector>
#include <thread>
#include <atomic>
#include <iostream>

// Data structures for Backend-Frontend queue:

struct frame;
// We keep no more then 2 frames in between frontend and backend
typedef librealsense::small_heap<frame, 2> frames_archive;
struct frame
{
frame() {}

// We don't want the frames to be overwritten at any point
// (deallocate resets item to "default" that we want to avoid)
frame(const frame& other) {}
frame(frame&& other) {}
frame& operator=(const frame& other) { return *this; }

std::vector<uint8_t> pixels;
librealsense::platform::frame_object fo;
frames_archive* owner; // Keep pointer to owner for light-deleter
};
void cleanup_frame(frame* ptr) {
if (ptr) ptr->owner->deallocate(ptr);
};
typedef void(*cleanup_ptr)(frame*);
// Unique_ptr is used as the simplest RAII, with static deleter
typedef std::unique_ptr<frame, cleanup_ptr> frame_ptr;
typedef single_consumer_queue<frame_ptr> frames_queue;

uvc_error_t winusb_uvc_scan_streaming(winusb_uvc_device *dev, winusb_uvc_device_info_t *info, int interface_idx);
uvc_error_t winusb_uvc_parse_vs(winusb_uvc_device *dev, winusb_uvc_device_info_t *info, winusb_uvc_streaming_interface_t *stream_if, const unsigned char *block, size_t block_size);

Expand Down Expand Up @@ -753,7 +789,8 @@ void winusb_uvc_swap_buffers(winusb_uvc_stream_handle_t *strmh) {
strmh->pts = 0;
}

void winusb_uvc_process_payload(winusb_uvc_stream_handle_t *strmh, uint8_t *payload, size_t payload_len) {
void winusb_uvc_process_payload(winusb_uvc_stream_handle_t *strmh,
uint8_t *payload, size_t payload_len, frames_archive* archive, frames_queue* queue) {
uint8_t header_len;
uint8_t header_info;
size_t data_len;
Expand Down Expand Up @@ -829,13 +866,32 @@ void winusb_uvc_process_payload(winusb_uvc_stream_handle_t *strmh, uint8_t *payl
if ((data_len > 0) && (strmh->cur_ctrl.dwMaxVideoFrameSize == (data_len)))
{
// save all frame+header
memcpy(strmh->outbuf + strmh->got_bytes, payload, data_len + header_len);
strmh->got_bytes += data_len;
//memcpy(strmh->outbuf + strmh->got_bytes, payload, data_len + header_len);
//strmh->got_bytes += data_len;

if (header_info & (1 << 1)) {
/* The EOF bit is set, so publish the complete frame */
winusb_uvc_swap_buffers(strmh);

auto frame_p = archive->allocate();
if (frame_p)
{
frame_ptr fp(frame_p, &cleanup_frame);

memset(fp->pixels.data(), 0, fp->pixels.size());

memcpy(fp->pixels.data(), payload, data_len + header_len);

librealsense::platform::frame_object fo{ data_len, header_len,
fp->pixels.data() + header_len , fp->pixels.data() };
fp->fo = fo;

queue->enqueue(std::move(fp));
}
else
{
//std::cout << "Frame from WinUSB backend was dropped (Frontend busy)\n";
}
//printf("EndPoint 0x%X: Received Frame %d (%zd bytes): header info = %08X\n", strmh->stream_if->bEndpointAddress, strmh->seq, payload_len, payload[1]);
LOG_INFO("Passing packet to user CB with size " << data_len + header_len);
librealsense::platform::frame_object fo{ data_len, header_len, strmh->holdbuf + header_len , strmh->holdbuf };
Expand All @@ -851,6 +907,36 @@ void stream_thread(winusb_uvc_stream_context *strctx)
PUCHAR buffer = (PUCHAR)malloc(strctx->maxPayloadTransferSize);
memset(buffer, 0, sizeof(strctx->maxPayloadTransferSize));

frames_archive archive;
std::atomic_bool keep_sending_callbacks = true;
frames_queue queue;

// Get all pointers from archive and initialize their content
std::vector<frame*> frames;
for (auto i = 0; i < frames_archive::CAPACITY; i++)
{
auto ptr = archive.allocate();
ptr->pixels.resize(strctx->maxPayloadTransferSize, 0);
ptr->owner = &archive;
frames.push_back(ptr);
}
for (auto ptr : frames)
{
archive.deallocate(ptr);
}

std::thread t([&]() {
while (keep_sending_callbacks)
{
frame_ptr fp(nullptr, [](frame*) {});
if (queue.dequeue(&fp, 50))
{
strctx->stream->user_cb(&fp->fo, strctx->stream->user_ptr);
fp.reset();
}
}
});

do {

DWORD transferred;
Expand All @@ -866,14 +952,20 @@ void stream_thread(winusb_uvc_stream_context *strctx)

//printf("success : %d\n", transferred);
LOG_INFO("Packet received with size " << transferred);
winusb_uvc_process_payload(strctx->stream, buffer, transferred);
winusb_uvc_process_payload(strctx->stream, buffer, lengthTransfered, &archive, &queue);
} while (strctx->stream->running);

// reseting pipe after use
auto ret = WinUsb_ResetPipe(strctx->stream->devh->associateHandle, strctx->endpoint);

free(buffer);
free(strctx);

queue.clear();
archive.stop_allocation();
archive.wait_until_empty();
keep_sending_callbacks = false;
t.join();
};

uvc_error_t winusb_uvc_stream_start(
Expand Down

0 comments on commit e4e6c34

Please sign in to comment.