diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 5c14c511c52192..54853172cd048c 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -85,8 +85,11 @@ int main(int argc, char *argv[]){ const QString route = args.empty() ? DEMO_ROUTE : args.first(); QStringList allow = parser.value("allow").isEmpty() ? QStringList{} : parser.value("allow").split(","); QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(","); + Replay *replay = new Replay(route, allow, block, nullptr, parser.isSet("dcam"), parser.isSet("ecam")); - replay->start(parser.value("start").toInt()); + if (replay->load()) { + replay->start(parser.value("start").toInt()); + } // start keyboard control thread QThread *t = QThread::create(keyboardThread, replay); diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index e8bdedd0621e49..313299e52094af 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -2,6 +2,7 @@ #include #include + #include "cereal/services.h" #include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/timing.h" @@ -19,13 +20,13 @@ inline void precise_nano_sleep(long sleep_ns) { } // spin wait if (sleep_ns > 0) { - while ((nanos_since_boot() - start_sleep) <= sleep_ns) {/**/} + while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); } } } Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, bool dcam, bool ecam, QObject *parent) : sm(sm_), load_dcam(dcam), load_ecam(ecam), QObject(parent) { - std::vector s; + std::vector s; for (const auto &it : services) { if ((allow.size() == 0 || allow.contains(it.name)) && !block.contains(it.name)) { @@ -49,28 +50,30 @@ Replay::~Replay() { // TODO: quit stream thread and free resources. } -void Replay::start(int seconds){ - // load route +bool Replay::load() { if (!route_->load() || route_->size() == 0) { qDebug() << "failed load route" << route_->name() << "from server"; - return; + return false; } - qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds; + qDebug() << "load route" << route_->name() << route_->size() << "segments"; segments_.resize(route_->size()); - seekTo(seconds); + return true; +} +void Replay::start(int seconds) { + seekTo(seconds); // start stream thread thread = new QThread; QObject::connect(thread, &QThread::started, [=]() { stream(); }); thread->start(); } -void Replay::updateEvents(const std::function& lambda) { +void Replay::updateEvents(const std::function &lambda) { // set updating_events to true to force stream thread relase the lock and wait for evnets_udpated. updating_events_ = true; { - std::unique_lock lk(lock_); + std::unique_lock lk(stream_lock_); events_updated_ = lambda(); updating_events_ = false; } @@ -80,22 +83,24 @@ void Replay::updateEvents(const std::function& lambda) { void Replay::seekTo(int seconds, bool relative) { if (segments_.empty()) return; + bool segment_loaded = false; updateEvents([&]() { if (relative) { seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9); } - seconds = std::clamp(seconds, 0, (int)segments_.size() * 60 - 1); - qInfo() << "seeking to " << seconds; + qInfo() << "seeking to" << seconds; - int segment = seconds / 60; - bool segment_changed = (segment != current_segment_); - - cur_mono_time_ = route_start_ts_ + seconds * 1e9; - setCurrentSegment(segment); - bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end(); - // return false if segment changed and not loaded yet - return !segment_changed || segment_loaded; + cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.size() * 60) * 1e9; + current_segment_ = std::min(seconds / 60, (int)segments_.size() - 1); + segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), current_segment_) != segments_merged_.end(); + return segment_loaded; }); + + if (!segment_loaded) { + // always emit segmentChanged if segment is not loaded. + // the current_segment_ may not valid when seeking cross boundary or seeking to an invalid segment. + emit segmentChanged(); + } } void Replay::pause(bool pause) { @@ -108,7 +113,7 @@ void Replay::pause(bool pause) { void Replay::setCurrentSegment(int n) { if (current_segment_.exchange(n) != n) { - emit segmentChanged(n); + emit segmentChanged(); } } @@ -124,11 +129,14 @@ void Replay::queueSegment() { } end_idx = i; // skip invalid segment - fwd += segments_[i]->isValid(); + if (segments_[i]->isValid()) { + ++fwd; + } else if (i == cur_seg) { + ++cur_seg; + } } - // merge segments - mergeSegments(cur_seg, end_idx); + mergeSegments(std::min(cur_seg, (int)segments_.size() - 1), end_idx); } void Replay::mergeSegments(int cur_seg, int end_idx) { @@ -138,7 +146,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { for (int i = begin_idx; i <= end_idx; ++i) { if (segments_[i] && segments_[i]->isLoaded()) { segments_need_merge.push_back(i); - } else if (i >= cur_seg) { + } else if (i >= cur_seg && segments_[i] && segments_[i]->isValid()) { // segment is valid,but still loading. can't skip it to merge the next one. // otherwise the stream thread may jump to the next segment. break; @@ -150,6 +158,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { // merge & sort events std::vector *new_events = new std::vector(); + new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, + [=](int v, int n) { return v + segments_[n]->log->events.size(); })); for (int n : segments_need_merge) { auto &log = segments_[n]->log; auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); @@ -164,7 +174,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); if (it != new_events->end()) { route_start_ts_ = (*it)->mono_time; - cur_mono_time_ = route_start_ts_; + // cur_mono_time_ is set by seekTo int start() before get route_start_ts_ + cur_mono_time_ += route_start_ts_; } } @@ -173,6 +184,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { return true; }); delete prev_events; + } else { + updateEvents([]() { return true; }); } // free segments out of current semgnt window. @@ -187,7 +200,7 @@ void Replay::stream() { float last_print = 0; cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; - std::unique_lock lk(lock_); + std::unique_lock lk(stream_lock_); while (true) { stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); }); diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 3e6f5839ee7028..b2bca2eaebf4a9 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -16,7 +16,7 @@ class Replay : public QObject { public: Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, bool dcam = false, bool ecam = false, QObject *parent = 0); ~Replay(); - + bool load(); void start(int seconds = 0); void seekTo(int seconds, bool relative = false); void relativeSeek(int seconds) { seekTo(seconds, true); } @@ -24,7 +24,7 @@ class Replay : public QObject { bool isPaused() const { return paused_; } signals: - void segmentChanged(int); + void segmentChanged(); protected slots: void queueSegment(); @@ -38,17 +38,18 @@ protected slots: QThread *thread; // logs - std::mutex lock_; + std::mutex stream_lock_; std::condition_variable stream_cv_; std::atomic updating_events_ = false; std::atomic current_segment_ = -1; + std::vector> segments_; + // the following variables must be protected with stream_lock_ bool exit_ = false; bool paused_ = false; bool events_updated_ = false; uint64_t route_start_ts_ = 0; uint64_t cur_mono_time_ = 0; std::vector *events_ = nullptr; - std::vector> segments_; std::vector segments_merged_; // messaging diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index f1dbddf40defc7..c996bb4bdd2ca0 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -119,10 +119,7 @@ Route::Route(const QString &route) : route_(route) {} bool Route::load() { QEventLoop loop; - auto onError = [&loop](const QString &err) { - qInfo() << err; - loop.quit(); - }; + auto onError = [&loop](const QString &err) { loop.quit(); }; bool ret = false; HttpRequest http(nullptr, !Hardware::PC()); diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index 126b6762665574..fae2d45a4752ce 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -30,10 +30,12 @@ class Route { inline int size() const { return segments_.size(); } inline SegmentFile &at(int n) { return segments_[n]; } + // public for unit tests + std::vector segments_; + protected: bool loadFromJson(const QString &json); QString route_; - std::vector segments_; }; class Segment : public QObject { diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 9a2560cea609dc..329c3df43e23f4 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -1,11 +1,12 @@ #include +#include +#include #include #include #include "catch2/catch.hpp" #include "selfdrive/common/util.h" -#include "selfdrive/ui/replay/framereader.h" -#include "selfdrive/ui/replay/route.h" +#include "selfdrive/ui/replay/replay.h" const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; @@ -49,3 +50,138 @@ TEST_CASE("httpMultiPartDownload") { REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false); } } + +int random_int(int min, int max) { + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist(min, max); + return dist(rng); +} + +bool is_events_ordered(const std::vector &events) { + REQUIRE(events.size() > 0); + uint64_t prev_mono_time = 0; + cereal::Event::Which prev_which = cereal::Event::INIT_DATA; + for (auto event : events) { + if (event->mono_time < prev_mono_time || (event->mono_time == prev_mono_time && event->which < prev_which)) { + return false; + } + prev_mono_time = event->mono_time; + prev_which = event->which; + } + return true; +} + +const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; + +TEST_CASE("Segment") { + Route demo_route(DEMO_ROUTE); + REQUIRE(demo_route.load()); + REQUIRE(demo_route.size() == 11); + + QEventLoop loop; + Segment segment(0, demo_route.at(0), false, false); + REQUIRE(segment.isValid() == true); + REQUIRE(segment.isLoaded() == false); + QObject::connect(&segment, &Segment::loadFinished, [&]() { + REQUIRE(segment.isLoaded() == true); + REQUIRE(segment.log != nullptr); + REQUIRE(segment.log->events.size() > 0); + REQUIRE(is_events_ordered(segment.log->events)); + REQUIRE(segment.frames[RoadCam] != nullptr); + REQUIRE(segment.frames[RoadCam]->getFrameCount() > 0); + REQUIRE(segment.frames[DriverCam] == nullptr); + REQUIRE(segment.frames[WideRoadCam] == nullptr); + loop.quit(); + }); + loop.exec(); +} + +// helper class for unit tests +class TestReplay : public Replay { +public: + TestReplay(const QString &route) : Replay(route, {}, {}) {} + void test_seek(); + +protected: + void testSeekTo(int seek_to, const std::set &invalid_segments = {}); +}; + +void TestReplay::testSeekTo(int seek_to, const std::set &invalid_segments) { + seekTo(seek_to); + + // wait for seek finish + while (true) { + std::unique_lock lk(stream_lock_); + stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); + events_updated_ = false; + + // verify result + REQUIRE(uint64_t(route_start_ts_ + seek_to * 1e9) == cur_mono_time_); + + Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_); + auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); + if (eit == events_->end()) { + qDebug() << "waiting for events..."; + continue; + } + + INFO("seek to [" << seek_to << "s segment " << seek_to / 60 << "]"); + REQUIRE(!events_->empty()); + REQUIRE(is_events_ordered(*events_)); + + REQUIRE(eit != events_->end()); + const int seek_to_segment = seek_to / 60; + const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; + current_segment_ = event_seconds / 60; + INFO("event [" << event_seconds << "s segment " << current_segment_ << "]"); + REQUIRE(event_seconds >= seek_to); + if (invalid_segments.find(seek_to_segment) == invalid_segments.end()) { + REQUIRE(event_seconds == seek_to); // at the same time + } else { + if (current_segment_ == seek_to_segment) { + // seek cross-boundary. e.g. seek_to 60s(segment 1), but segment 0 end at 60.021 and segemnt 1 is invalid. + REQUIRE(event_seconds == seek_to); + } else { + REQUIRE(current_segment_ > seek_to_segment); + REQUIRE(invalid_segments.find(current_segment_) == invalid_segments.end()); + } + } + break; + } +} + +void TestReplay::test_seek() { + QEventLoop loop; + + std::thread thread = std::thread([&]() { + const int loop_count = 100; + // random seek in one segment + for (int i = 0; i < loop_count; ++i) { + testSeekTo(random_int(0, 60)); + } + // random seek in 3 segments + for (int i = 0; i < loop_count; ++i) { + testSeekTo(random_int(0, 60 * 3)); + } + // random seek in invalid segments + std::set invalid_segments{5, 6, 7, 9}; + for (int i : invalid_segments) { + route_->segments_[i].rlog = route_->segments_[i].qlog = ""; + route_->segments_[i].road_cam = route_->segments_[i].qcamera = ""; + } + for (int i = 0; i < loop_count; ++i) { + testSeekTo(random_int(4 * 60, 60 * 10), invalid_segments); + } + loop.quit(); + }); + + loop.exec(); + thread.join(); +} + +TEST_CASE("Replay") { + TestReplay replay(DEMO_ROUTE); + REQUIRE(replay.load()); + replay.test_seek(); +} diff --git a/selfdrive/ui/replay/tests/test_runner.cc b/selfdrive/ui/replay/tests/test_runner.cc index 62bf7476a18996..b20ac86c64847b 100644 --- a/selfdrive/ui/replay/tests/test_runner.cc +++ b/selfdrive/ui/replay/tests/test_runner.cc @@ -1,2 +1,10 @@ -#define CATCH_CONFIG_MAIN +#define CATCH_CONFIG_RUNNER #include "catch2/catch.hpp" +#include + +int main(int argc, char **argv) { + // unit tests for Qt + QCoreApplication app(argc, argv); + const int res = Catch::Session().run(argc, argv); + return (res < 0xff ? res : 0xff); +}