Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncer improvements & Concurrency fixes #9159

Merged
merged 10 commits into from
Jun 12, 2021
Merged

Conversation

maloel
Copy link
Collaborator

@maloel maloel commented Jun 4, 2021

Following the deadlock that Nir experienced, reworked some stuff (and not all because of the deadlock):

  • split off dispatcher code into dispatcher.cpp
  • syncer (timestamp) inactive stream handling moved into skip_missing_stream
  • syncer identity_matcher names now include the stream index
  • single_consumer_queue::_mutex is now mutable; size() and empty() const
  • revised logic inside single_consumer_queue
  • revised logic inside dispatcher
  • cancellable_time::try_sleep() is now templated, requires actual chrono duration (and not just its ::Rep)
  • syncer stop() calls now percolate recursively to shut down all queues; new syncs do nothing if syncer was stopped
  • fix 1 warning on the way :)

@maloel maloel requested review from ev-mp, aangerma and Nir-Az June 4, 2021 03:30
Copy link
Collaborator

@Nir-Az Nir-Az left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just 1 comment

std::unique_lock<std::mutex> lock(_mutex);
if (_accepting)
_enq_cv.wait( lock, [this]() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we wait if we are not accepting?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait( lock, pred )

is the same as:

while( ! pred() ) {
    wait( lock );
}

Copy link
Collaborator Author

@maloel maloel Jun 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not accepting, the queue should be empty, so we shouldn't really wait, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are relying on logic elsewhere that say (accepting = false) is the same as (size = 0)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes, I guess so: compound predicates get less readable because of the !. Would it be better if I wrote the pred as return _accepting && _queue.size() < _cap; ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it gets more complicated...
You want:

while( _accepting && _queue.size() >= _cap ) {
    wait();
}

So the logic inside the pred should be return ! _accepting || _queue.size() < _cap;...

_was_flushed = false;
const auto ready = [this]() { return (_queue.size() > 0) || _need_to_flush; };
if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
if( ! _deq_cv.wait_for( lock,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if statement is really confusing..
too much "!" operators..
I suggest rewrite it as a positive if so it will be more understandable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything with these predicates is confusing :)

src/sync.cpp Show resolved Hide resolved
if (_accepting)
_enq_cv.wait( lock, [this]() {
return _queue.size() < _cap; } );
if( ! _accepting )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be inside the predicate ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we stop accepting then someone called stop() which calls clear() which empties out the queue which should make the predicate true and stop.


// Wait until any dispatched is done...
{
std::lock_guard< std::mutex > lock( _dispatch_mutex );
Copy link
Contributor

@aangerma aangerma Jun 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not ensure that the dispatcher completely stopped , it can be on the middle of the last item.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will make this code wait until the dispatcher is done

if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
if( ! _deq_cv.wait_for( lock,
std::chrono::milliseconds( timeout_ms ),
[this]() { return ! _accepting || ! _queue.empty(); } )
Copy link
Contributor

@aangerma aangerma Jun 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_accepting && ! _queue.empty(); will be more understandable and you will not need the || _queue.empty() )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while( ! pred() )
    wait();

if pred() returns true then we still check the following line: _queue.empty() -- that's why it's there -- and return false.

// Wait until any dispatched is done...
{
std::lock_guard< std::mutex > lock( _dispatch_mutex );
assert( _queue.empty() );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts will omitted in release builds, and as release is the default for most testing the condition won't be checked

@maloel maloel changed the base branch from development to concsync June 12, 2021 12:06
@maloel maloel merged commit 5d1f9f3 into IntelRealSense:concsync Jun 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants