Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c++ replay: more robust seek #22375

Merged
merged 17 commits into from
Oct 4, 2021
5 changes: 4 additions & 1 deletion selfdrive/ui/replay/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ int main(int argc, char *argv[]){
const QString route = args.empty() ? DEMO_ROUTE : args.first();
QStringList allow = parser.value("allow").isEmpty() ? QStringList{} : parser.value("allow").split(",");
QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(",");

Replay *replay = new Replay(route, allow, block, nullptr, parser.isSet("dcam"), parser.isSet("ecam"));
replay->start(parser.value("start").toInt());
if (replay->load()) {
replay->start(parser.value("start").toInt());
}

// start keyboard control thread
QThread *t = QThread::create(keyboardThread, replay);
Expand Down
65 changes: 39 additions & 26 deletions selfdrive/ui/replay/replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <QApplication>
#include <QDebug>

#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"
Expand All @@ -19,13 +20,13 @@ inline void precise_nano_sleep(long sleep_ns) {
}
// spin wait
if (sleep_ns > 0) {
while ((nanos_since_boot() - start_sleep) <= sleep_ns) {/**/}
while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); }
}
}

Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, bool dcam, bool ecam, QObject *parent)
: sm(sm_), load_dcam(dcam), load_ecam(ecam), QObject(parent) {
std::vector<const char*> s;
std::vector<const char *> s;
for (const auto &it : services) {
if ((allow.size() == 0 || allow.contains(it.name)) &&
!block.contains(it.name)) {
Expand All @@ -49,28 +50,30 @@ Replay::~Replay() {
// TODO: quit stream thread and free resources.
}

void Replay::start(int seconds){
// load route
bool Replay::load() {
if (!route_->load() || route_->size() == 0) {
qDebug() << "failed load route" << route_->name() << "from server";
return;
return false;
}

qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds;
qDebug() << "load route" << route_->name() << route_->size() << "segments";
segments_.resize(route_->size());
seekTo(seconds);
return true;
}

void Replay::start(int seconds) {
seekTo(seconds);
// start stream thread
thread = new QThread;
QObject::connect(thread, &QThread::started, [=]() { stream(); });
thread->start();
}

void Replay::updateEvents(const std::function<bool()>& lambda) {
void Replay::updateEvents(const std::function<bool()> &lambda) {
// set updating_events to true to force stream thread relase the lock and wait for evnets_udpated.
updating_events_ = true;
{
std::unique_lock lk(lock_);
std::unique_lock lk(stream_lock_);
events_updated_ = lambda();
updating_events_ = false;
}
Expand All @@ -80,22 +83,24 @@ void Replay::updateEvents(const std::function<bool()>& lambda) {
void Replay::seekTo(int seconds, bool relative) {
if (segments_.empty()) return;

bool segment_loaded = false;
updateEvents([&]() {
if (relative) {
seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9);
}
seconds = std::clamp(seconds, 0, (int)segments_.size() * 60 - 1);
qInfo() << "seeking to " << seconds;
qInfo() << "seeking to" << seconds;

int segment = seconds / 60;
bool segment_changed = (segment != current_segment_);

cur_mono_time_ = route_start_ts_ + seconds * 1e9;
setCurrentSegment(segment);
bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end();
// return false if segment changed and not loaded yet
return !segment_changed || segment_loaded;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.size() * 60) * 1e9;
current_segment_ = std::min(seconds / 60, (int)segments_.size() - 1);
segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), current_segment_) != segments_merged_.end();
return segment_loaded;
});

if (!segment_loaded) {
// always emit segmentChanged if segment is not loaded.
// the current_segment_ may not valid when seeking cross boundary or seeking to an invalid segment.
emit segmentChanged();
}
}

void Replay::pause(bool pause) {
Expand All @@ -108,7 +113,7 @@ void Replay::pause(bool pause) {

void Replay::setCurrentSegment(int n) {
if (current_segment_.exchange(n) != n) {
emit segmentChanged(n);
emit segmentChanged();
}
}

Expand All @@ -124,11 +129,14 @@ void Replay::queueSegment() {
}
end_idx = i;
// skip invalid segment
fwd += segments_[i]->isValid();
if (segments_[i]->isValid()) {
++fwd;
} else if (i == cur_seg) {
++cur_seg;
}
}

// merge segments
mergeSegments(cur_seg, end_idx);
mergeSegments(std::min(cur_seg, (int)segments_.size() - 1), end_idx);
}

void Replay::mergeSegments(int cur_seg, int end_idx) {
Expand All @@ -138,7 +146,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
for (int i = begin_idx; i <= end_idx; ++i) {
if (segments_[i] && segments_[i]->isLoaded()) {
segments_need_merge.push_back(i);
} else if (i >= cur_seg) {
} else if (i >= cur_seg && segments_[i] && segments_[i]->isValid()) {
// segment is valid,but still loading. can't skip it to merge the next one.
// otherwise the stream thread may jump to the next segment.
break;
Expand All @@ -150,6 +158,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {

// merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>();
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
[=](int v, int n) { return v + segments_[n]->log->events.size(); }));
for (int n : segments_need_merge) {
auto &log = segments_[n]->log;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
Expand All @@ -164,7 +174,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts_ = (*it)->mono_time;
cur_mono_time_ = route_start_ts_;
// cur_mono_time_ is set by seekTo int start() before get route_start_ts_
cur_mono_time_ += route_start_ts_;
}
}

Expand All @@ -173,6 +184,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
return true;
});
delete prev_events;
} else {
updateEvents([]() { return true; });
}

// free segments out of current semgnt window.
Expand All @@ -187,7 +200,7 @@ void Replay::stream() {
float last_print = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;

std::unique_lock lk(lock_);
std::unique_lock lk(stream_lock_);

while (true) {
stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); });
Expand Down
9 changes: 5 additions & 4 deletions selfdrive/ui/replay/replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ class Replay : public QObject {
public:
Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, bool dcam = false, bool ecam = false, QObject *parent = 0);
~Replay();

bool load();
void start(int seconds = 0);
void seekTo(int seconds, bool relative = false);
void relativeSeek(int seconds) { seekTo(seconds, true); }
void pause(bool pause);
bool isPaused() const { return paused_; }

signals:
void segmentChanged(int);
void segmentChanged();

protected slots:
void queueSegment();
Expand All @@ -38,17 +38,18 @@ protected slots:
QThread *thread;

// logs
std::mutex lock_;
std::mutex stream_lock_;
std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = -1;
std::vector<std::unique_ptr<Segment>> segments_;
// the following variables must be protected with stream_lock_
bool exit_ = false;
bool paused_ = false;
bool events_updated_ = false;
uint64_t route_start_ts_ = 0;
uint64_t cur_mono_time_ = 0;
std::vector<Event *> *events_ = nullptr;
std::vector<std::unique_ptr<Segment>> segments_;
std::vector<int> segments_merged_;

// messaging
Expand Down
5 changes: 1 addition & 4 deletions selfdrive/ui/replay/route.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ Route::Route(const QString &route) : route_(route) {}

bool Route::load() {
QEventLoop loop;
auto onError = [&loop](const QString &err) {
qInfo() << err;
loop.quit();
};
auto onError = [&loop](const QString &err) { loop.quit(); };

bool ret = false;
HttpRequest http(nullptr, !Hardware::PC());
Expand Down
4 changes: 3 additions & 1 deletion selfdrive/ui/replay/route.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ class Route {
inline int size() const { return segments_.size(); }
inline SegmentFile &at(int n) { return segments_[n]; }

// public for unit tests
std::vector<SegmentFile> segments_;

protected:
bool loadFromJson(const QString &json);
QString route_;
std::vector<SegmentFile> segments_;
};

class Segment : public QObject {
Expand Down
Loading