Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Playback realtime fix #2360

Merged
merged 12 commits into from
Sep 17, 2018
16 changes: 11 additions & 5 deletions src/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace librealsense
rs2_time_t backend_timestamp = 0;
rs2_time_t last_timestamp = 0;
unsigned long long last_frame_number = 0;
bool is_blocking = false;

frame_additional_data() {};

Expand All @@ -44,14 +45,16 @@ namespace librealsense
const uint8_t* md_buf,
double backend_time,
rs2_time_t last_timestamp,
unsigned long long last_frame_number)
unsigned long long last_frame_number,
bool in_is_blocking)
: timestamp(in_timestamp),
frame_number(in_frame_number),
system_time(in_system_time),
metadata_size(md_size),
backend_timestamp(backend_time),
last_timestamp(last_timestamp),
last_frame_number(last_frame_number)
last_frame_number(last_frame_number),
is_blocking(in_is_blocking)
{
// Copy up to 255 bytes to preserve metadata as raw data
if (metadata_size)
Expand Down Expand Up @@ -156,6 +159,9 @@ namespace librealsense
void mark_fixed() override { _fixed = true; }
bool is_fixed() const override { return _fixed; }

void set_blocking(bool state) override { additional_data.is_blocking = state; }
bool is_blocking() const override { return additional_data.is_blocking; }

private:
// TODO: check boost::intrusive_ptr or an alternative
std::atomic<int> ref_count; // the reference count is on how many times this placeholder has been observed (not lifetime, not content)
Expand Down Expand Up @@ -303,7 +309,7 @@ namespace librealsense
return((depth_frame*)_original.frame)->get_distance(x, y);

uint64_t pixel = 0;
switch (get_bpp()/8) // bits per pixel
switch (get_bpp() / 8) // bits per pixel
{
case 1: pixel = get_frame_data()[y*get_width() + x]; break;
case 2: pixel = reinterpret_cast<const uint16_t*>(get_frame_data())[y*get_width() + x]; break;
Expand Down Expand Up @@ -353,7 +359,7 @@ namespace librealsense
try
{
auto depth_sensor = As<librealsense::depth_sensor>(sensor);
if(depth_sensor != nullptr)
if (depth_sensor != nullptr)
{
return depth_sensor->get_depth_scale();
}
Expand Down Expand Up @@ -484,4 +490,4 @@ namespace librealsense

MAP_EXTENSION(RS2_EXTENSION_POSE_FRAME, librealsense::pose_frame);

}
}
183 changes: 130 additions & 53 deletions src/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,113 +14,181 @@ const int QUEUE_MAX_SIZE = 10;
template<class T>
class single_consumer_queue
{
std::deque<T> q;
std::mutex mutex;
std::condition_variable cv; // not empty signal
unsigned int cap;
bool accepting;
std::deque<T> _queue;
std::mutex _mutex;
std::condition_variable _deq_cv; // not empty signal
std::condition_variable _enq_cv; // not empty signal

unsigned int _cap;
bool _accepting;

// flush mechanism is required to abort wait on cv
// when need to stop
std::atomic<bool> need_to_flush;
std::atomic<bool> was_flushed;
std::condition_variable was_flushed_cv;
std::mutex was_flushed_mutex;
std::atomic<bool> _need_to_flush;
std::atomic<bool> _was_flushed;
public:
explicit single_consumer_queue<T>(unsigned int cap = QUEUE_MAX_SIZE)
: q(), mutex(), cv(), cap(cap), need_to_flush(false), was_flushed(false), accepting(true)
: _queue(), _mutex(), _deq_cv(), _enq_cv(), _cap(cap), _need_to_flush(false), _was_flushed(false), _accepting(true)
{}

void enqueue(T&& item)
{
std::unique_lock<std::mutex> lock(mutex);
if (accepting)
std::unique_lock<std::mutex> lock(_mutex);
if (_accepting)
{
q.push_back(std::move(item));
if (q.size() > cap)
_queue.push_back(std::move(item));
if (_queue.size() > _cap)
{
q.pop_front();
_queue.pop_front();
}
}
lock.unlock();
cv.notify_one();
_deq_cv.notify_one();
}

bool dequeue(T* item ,unsigned int timeout_ms = 5000)
void blocking_enqueue(T&& item)
{
std::unique_lock<std::mutex> lock(mutex);
accepting = true;
was_flushed = false;
const auto ready = [this]() { return (q.size() > 0) || need_to_flush; };
if (!ready() && !cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
{
return false;
}
auto pred = [this]()->bool { return _queue.size() <= _cap; };

if (q.size() <= 0)
std::unique_lock<std::mutex> lock(_mutex);
if (_accepting)
{
return false;
_enq_cv.wait(lock, pred);
_queue.push_back(std::move(item));
}
*item = std::move(q.front());
q.pop_front();
return true;
lock.unlock();
_deq_cv.notify_one();
}

bool peek(T** item)

bool dequeue(T* item ,unsigned int timeout_ms = 5000)
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(_mutex);
_accepting = true;
_was_flushed = false;
const auto ready = [this]() { return (_queue.size() > 0) || _need_to_flush; };
if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
{
return false;
}

if (q.size() <= 0)
if (_queue.size() <= 0)
{
return false;
}
*item = &q.front();
*item = std::move(_queue.front());
_queue.pop_front();
_enq_cv.notify_one();
return true;
}

bool try_dequeue(T* item)
{
std::unique_lock<std::mutex> lock(mutex);
accepting = true;
if (q.size() > 0)
std::unique_lock<std::mutex> lock(_mutex);
_accepting = true;
if (_queue.size() > 0)
{
auto val = std::move(q.front());
q.pop_front();
auto val = std::move(_queue.front());
_queue.pop_front();
*item = std::move(val);
_enq_cv.notify_one();
return true;
}
return false;
}

bool peek(T** item)
{
std::unique_lock<std::mutex> lock(_mutex);

if (_queue.size() <= 0)
{
return false;
}
*item = &_queue.front();
return true;
}

void clear()
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(_mutex);

accepting = false;
need_to_flush = true;
_accepting = false;
_need_to_flush = true;

while (q.size() > 0)
while (_queue.size() > 0)
{
auto item = std::move(q.front());
q.pop_front();
auto item = std::move(_queue.front());
_queue.pop_front();
}
cv.notify_all();
_deq_cv.notify_all();
}

void start()
{
std::unique_lock<std::mutex> lock(mutex);
need_to_flush = false;
accepting = true;
std::unique_lock<std::mutex> lock(_mutex);
_need_to_flush = false;
_accepting = true;
}

size_t size()
{
std::unique_lock<std::mutex> lock(mutex);
return q.size();
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size();
}
};

template<class T>
class single_consumer_frame_queue
{
single_consumer_queue<T> _queue;

public:
single_consumer_frame_queue<T>(unsigned int cap = QUEUE_MAX_SIZE) : _queue(cap) {}

void enqueue(T&& item)
{
if (item.is_blocking())
_queue.blocking_enqueue(std::move(item));
else
_queue.enqueue(std::move(item));
}

bool dequeue(T* item, unsigned int timeout_ms = 5000)
{
return _queue.dequeue(item, timeout_ms);
}

bool peek(T** item)
{
return _queue.peek(item);
}

bool try_dequeue(T* item)
{
return _queue.try_dequeue(item);
}

void clear()
{
_queue.clear();
}

void start()
{
_queue.start();
}

void flush()
{
_queue.flush();
}

size_t size()
{
return _queue.size();
}
};

class dispatcher
{
Expand Down Expand Up @@ -181,11 +249,14 @@ class dispatcher
}

template<class T>
void invoke(T item)
void invoke(T item, bool is_blocking = false)
{
if (!_was_stopped)
{
_queue.enqueue(std::move(item));
if(is_blocking)
_queue.blocking_enqueue(std::move(item));
else
_queue.enqueue(std::move(item));
}
}

Expand Down Expand Up @@ -248,6 +319,12 @@ class dispatcher
*wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
return *wait_sucess;
}

bool empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be const

{
return _queue.size() == 0;
}

private:
friend cancellable_timer;
single_consumer_queue<std::function<void(cancellable_timer)>> _queue;
Expand Down
46 changes: 44 additions & 2 deletions src/core/streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,54 @@ namespace librealsense

virtual void mark_fixed() = 0;
virtual bool is_fixed() const = 0;
virtual void set_blocking(bool state) = 0;
virtual bool is_blocking() const = 0;

virtual void keep() = 0;

virtual ~frame_interface() = default;
};

struct frame_holder
{
frame_interface* frame;

frame_interface* operator->()
{
return frame;
}

operator bool() const { return frame != nullptr; }

operator frame_interface*() const { return frame; }

frame_holder(frame_interface* f)
{
frame = f;
}

~frame_holder();

frame_holder(frame_holder&& other)
: frame(other.frame)
{
other.frame = nullptr;
}

frame_holder() : frame(nullptr) {}


frame_holder& operator=(frame_holder&& other);

frame_holder clone() const;

bool is_blocking() const { return frame->is_blocking(); };

private:
frame_holder& operator=(const frame_holder& other) = delete;
frame_holder(const frame_holder& other);
};

using on_frame = std::function<void(frame_interface*)>;
using stream_profiles = std::vector<std::shared_ptr<stream_profile_interface>>;

Expand Down Expand Up @@ -221,7 +263,7 @@ namespace librealsense

MAP_EXTENSION(RS2_EXTENSION_DEPTH_STEREO_SENSOR, librealsense::depth_stereo_sensor);

class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot
class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot
{
public:
depth_stereo_sensor_snapshot(float depth_units, float stereo_bl_mm) :
Expand Down Expand Up @@ -256,4 +298,4 @@ namespace librealsense
private:
float m_stereo_baseline_mm;
};
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line

Loading